/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws.kinesis;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.TransferStyle;
import io.leoplatform.sdk.aws.kinesis.KinesisProducerWriter;
import io.leoplatform.sdk.aws.payload.CompressionWriter;
import io.leoplatform.sdk.config.ConnectorConfig;
import io.leoplatform.sdk.payload.EventPayload;
import io.leoplatform.sdk.payload.FileSegment;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class KinesisQueue
implements AsyncWorkQueue {
    private static final Logger log = LoggerFactory.getLogger(KinesisQueue.class);
    private final long maxBatchAge;
    private final int maxBatchRecords;
    private final long maxBatchSize;
    private final ExecutorManager executorManager;
    private final CompressionWriter compression;
    private final KinesisProducerWriter writer;
    private final BlockingQueue<EventPayload> payloads = new LinkedBlockingQueue<EventPayload>();
    private final List<CompletableFuture<Void>> pendingWrites = new LinkedList<CompletableFuture<Void>>();
    private final AtomicBoolean running;
    private final Lock lock = new ReentrantLock();
    private final Condition batchSend = this.lock.newCondition();

    @Inject
    public KinesisQueue(ConnectorConfig config, ExecutorManager executorManager, CompressionWriter compression, KinesisProducerWriter writer) {
        this.maxBatchAge = config.longValueOrElse("Stream.MaxBatchAge", Long.valueOf(400L));
        this.maxBatchRecords = config.intValueOrElse("Stream.MaxBatchRecords", Integer.valueOf(1000));
        this.maxBatchSize = config.longValueOrElse("Stream.MaxBatchSize", Long.valueOf(0x100000L));
        this.executorManager = executorManager;
        this.compression = compression;
        this.writer = writer;
        this.running = new AtomicBoolean(true);
        this.pendingWrites.add(CompletableFuture.runAsync(this::asyncBatchSend, executorManager.get()));
    }

    public void addEntity(EventPayload entity) {
        if (this.running.get()) {
            this.add(entity);
            if (this.exceedsMaxRecords()) {
                this.signalBatch();
            }
        } else {
            log.warn("Attempt to add kinesis payload to a stopped queue");
        }
    }

    public void flush() {
        this.writer.flush();
    }

    public StreamStats end() {
        this.running.set(false);
        this.signalBatch();
        this.sendAll();
        this.completePendingTasks();
        return this.writer.end();
    }

    private void add(EventPayload entity) {
        this.lock.lock();
        try {
            this.payloads.add(entity);
        }
        finally {
            this.lock.unlock();
        }
    }

    private void asyncBatchSend() {
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.batchSend.await(this.maxBatchAge, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                this.running.set(false);
                log.info("Kinesis queue stopped with {} pending", (Object)this.payloads.size());
            }
            finally {
                this.lock.unlock();
            }
            this.sendBatch();
        }
    }

    private void sendBatch() {
        do {
            this.send();
        } while (this.exceedsMaxRecords());
    }

    private void sendAll() {
        while (!this.payloads.isEmpty()) {
            this.send();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completePendingTasks() {
        this.removeCompleted();
        this.lock.lock();
        try {
            long inFlight = this.pendingWrites.parallelStream().filter(w -> !w.isDone()).count();
            log.info("Waiting for {} kinesis background task{} to complete", (Object)inFlight, (Object)(inFlight == 1L ? "" : "s"));
        }
        finally {
            this.lock.unlock();
        }
        while (!this.pendingWrites.isEmpty()) {
            this.lock.lock();
            try {
                this.batchSend.await(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                log.warn("Stopped with incomplete pending Kinesis batch tasks");
                this.pendingWrites.clear();
            }
            finally {
                this.lock.unlock();
            }
            this.removeCompleted();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send() {
        Set<EventPayload> toSend = this.drainToSet();
        Executor e = this.executorManager.get();
        CompletionStage cf = ((CompletableFuture)CompletableFuture.supplyAsync(() -> this.compressPayloads(toSend), e).thenAcceptAsync(this::toKinesis, e)).thenRunAsync(this::removeCompleted, e);
        this.lock.lock();
        try {
            this.pendingWrites.add((CompletableFuture<Void>)cf);
        }
        finally {
            this.lock.unlock();
        }
    }

    private Queue<FileSegment> compressPayloads(Set<EventPayload> toSend) {
        FileSegment compressedBatch = this.compression.compressWithOffsets(toSend);
        if (compressedBatch.getOffset().getGzipSize() > this.maxBatchSize) {
            log.warn("Compressed payload batch exceeds {} bytes", (Object)compressedBatch.getOffset().getGzipSize());
            return toSend.parallelStream().map(Collections::singletonList).map(this.compression::compressWithOffsets).peek(this::checkSize).filter(s -> s.getOffset().getGzipSize() <= this.maxBatchSize).collect(Collectors.toCollection(LinkedList::new));
        }
        return Stream.of(compressedBatch).collect(Collectors.toCollection(LinkedList::new));
    }

    private void checkSize(FileSegment fileSegment) {
        Long compressed = fileSegment.getOffset().getGzipSize();
        if (compressed > this.maxBatchSize) {
            log.error("Skipping {} byte payload which exceeds maximum of {} bytes", (Object)compressed, (Object)this.maxBatchSize);
        }
    }

    private void toKinesis(Queue<FileSegment> segments) {
        segments.stream().filter(Objects::nonNull).map(FileSegment::getSegment).filter(b -> ((byte[])b).length > 0).map(ByteBuffer::wrap).forEachOrdered(this.writer::write);
    }

    private Set<EventPayload> drainToSet() {
        LinkedHashSet<EventPayload> toSend = new LinkedHashSet<EventPayload>();
        this.lock.lock();
        try {
            this.payloads.drainTo(toSend, this.maxBatchRecords);
        }
        finally {
            this.lock.unlock();
        }
        return toSend;
    }

    private void signalBatch() {
        this.lock.lock();
        try {
            this.batchSend.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf(CompletableFuture::isDone);
        }
        finally {
            this.lock.unlock();
        }
    }

    public TransferStyle style() {
        return TransferStyle.STREAM;
    }

    private boolean exceedsMaxRecords() {
        return this.payloads.size() >= this.maxBatchRecords;
    }
}

