/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws.s3;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.TransferStyle;
import io.leoplatform.sdk.aws.payload.CompressionWriter;
import io.leoplatform.sdk.aws.s3.S3Writer;
import io.leoplatform.sdk.payload.EventPayload;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class S3Queue
implements AsyncWorkQueue {
    private static final Logger log = LoggerFactory.getLogger(S3Queue.class);
    private final int maxBatchRecords = 1000;
    private final CompressionWriter compression;
    private final ExecutorManager executorManager;
    private final S3Writer s3Writer;
    private final BlockingQueue<EventPayload> payloads = new LinkedBlockingQueue<EventPayload>();
    private final List<CompletableFuture<Void>> pendingWrites = new LinkedList<CompletableFuture<Void>>();
    private final AtomicBoolean running;
    private final Lock lock = new ReentrantLock();
    private final Condition batchSend = this.lock.newCondition();

    @Inject
    public S3Queue(ExecutorManager executorManager, CompressionWriter compression, S3Writer s3Writer) {
        this.compression = compression;
        this.executorManager = executorManager;
        this.s3Writer = s3Writer;
        this.running = new AtomicBoolean(true);
        CompletableFuture.runAsync(this::asyncBatchSend, executorManager.get());
    }

    public void addEntity(EventPayload entity) {
        if (this.running.get()) {
            this.add(entity);
            if (this.exceedsMaxRecords()) {
                this.signalBatch();
            }
        } else {
            log.warn("Attempt to add S3 payload to a stopped queue");
        }
    }

    public void flush() {
        this.signalBatch();
        this.sendAll();
        this.completePendingWrites();
        this.s3Writer.flush();
    }

    private void add(EventPayload entity) {
        this.lock.lock();
        try {
            this.payloads.add(entity);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void asyncBatchSend() {
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.batchSend.await(200L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                this.running.set(false);
                log.info("S3 queue stopped with {} pending", (Object)this.payloads.size());
            }
            finally {
                this.lock.unlock();
            }
            this.sendBatch();
        }
    }

    public StreamStats end() {
        this.flush();
        this.running.set(false);
        this.signalBatch();
        return this.s3Writer.end();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send() {
        Set<EventPayload> toSend = this.drainToSet();
        if (!toSend.isEmpty()) {
            this.lock.lock();
            try {
                Executor e = this.executorManager.get();
                CompletionStage cf = ((CompletableFuture)CompletableFuture.supplyAsync(() -> this.compression.compressWithOffsets(toSend), e).thenAcceptAsync(this.s3Writer::write, e)).thenRunAsync(this::removeCompleted, e);
                this.pendingWrites.add((CompletableFuture<Void>)cf);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private void sendBatch() {
        do {
            this.send();
        } while (this.exceedsMaxRecords());
    }

    private void sendAll() {
        while (!this.payloads.isEmpty()) {
            this.send();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completePendingWrites() {
        this.removeCompleted();
        this.lock.lock();
        try {
            long inFlight = this.pendingWrites.parallelStream().filter(w -> !w.isDone()).count();
            if (inFlight > 0L) {
                log.info("Waiting for {} S3 background task{} to stop", (Object)inFlight, (Object)(inFlight == 1L ? "" : "s"));
            }
        }
        finally {
            this.lock.unlock();
        }
        while (!this.pendingWrites.isEmpty()) {
            this.lock.lock();
            try {
                this.batchSend.await(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                log.warn("Stopped with incomplete pending S3 batch tasks");
                this.pendingWrites.clear();
            }
            finally {
                this.lock.unlock();
            }
            this.removeCompleted();
        }
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf(CompletableFuture::isDone);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void signalBatch() {
        this.lock.lock();
        try {
            this.batchSend.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    private Set<EventPayload> drainToSet() {
        LinkedHashSet<EventPayload> toSend = new LinkedHashSet<EventPayload>();
        this.lock.lock();
        try {
            this.payloads.drainTo(toSend, 1000);
        }
        finally {
            this.lock.unlock();
        }
        return toSend;
    }

    private boolean exceedsMaxRecords() {
        return this.payloads.size() >= 1000;
    }

    public TransferStyle style() {
        return TransferStyle.STORAGE;
    }
}

