/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws.s3;

import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.aws.s3.PendingFileUpload;
import io.leoplatform.sdk.aws.s3.PendingMemoryUpload;
import io.leoplatform.sdk.aws.s3.S3BufferStyle;
import io.leoplatform.sdk.aws.s3.S3TransferManager;
import io.leoplatform.sdk.config.ConnectorConfig;
import io.leoplatform.sdk.payload.FileSegment;
import io.leoplatform.sdk.payload.StorageEventOffset;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class S3Writer {
    private static final Logger log = LoggerFactory.getLogger(S3Writer.class);
    private static final DateTimeFormatter eidFormat = DateTimeFormatter.ofPattern("uuuu'/'MM'/'dd'/'HH'/'mm").withZone(ZoneOffset.UTC);
    private final long maxBatchAge;
    private final int maxBatchRecords;
    private final long maxRecordSize;
    private final S3BufferStyle bufferStyle;
    private final S3TransferManager transferManager;
    private final Queue<FileSegment> payloads = new LinkedList<FileSegment>();
    private final AtomicBoolean running;
    private final Lock lock = new ReentrantLock();
    private final Condition batchSend = this.lock.newCondition();

    @Inject
    public S3Writer(ConnectorConfig config, S3TransferManager transferManager, ExecutorManager executorManager) {
        this.maxBatchAge = config.longValueOrElse("Storage.MaxBatchAge", Long.valueOf(4000L));
        this.maxBatchRecords = config.intValueOrElse("Storage.MaxBatchRecords", Integer.valueOf(6000));
        this.maxRecordSize = config.longValueOrElse("Storage.MaxBatchSize", Long.valueOf(5017600L));
        this.bufferStyle = S3BufferStyle.fromName(config.valueOrElse("Storage.BufferStyle", "Memory"));
        this.transferManager = transferManager;
        this.running = new AtomicBoolean(true);
        CompletableFuture.runAsync(this::asyncBatchSend, executorManager.get());
    }

    void write(FileSegment fileSegment) {
        if (this.running.get()) {
            this.add(fileSegment);
            this.signalBatch();
        } else {
            log.warn("Attempt to add file segment to a stopped batch process");
        }
    }

    void flush() {
        this.signalBatch();
        this.sendAll();
        this.transferManager.flush();
    }

    private void add(FileSegment segment) {
        this.lock.lock();
        try {
            this.payloads.add(segment);
        }
        finally {
            this.lock.unlock();
        }
    }

    StreamStats end() {
        log.info("Stopping S3 writer");
        this.flush();
        this.running.set(false);
        this.signalBatch();
        log.info("Stopped S3 writer");
        return this.transferManager.end();
    }

    private void signalBatch() {
        this.lock.lock();
        try {
            this.batchSend.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    private void asyncBatchSend() {
        Instant lastUpload = Instant.now();
        while (this.running.get()) {
            this.lock.lock();
            try {
                this.batchSend.await(this.maxBatchAge, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                this.running.set(false);
                log.info("S3 batch writer stopped with {} pending", (Object)this.payloads.size());
            }
            finally {
                this.lock.unlock();
            }
            if (!Instant.now().isAfter(lastUpload.plusMillis(this.maxBatchAge))) continue;
            this.send();
            lastUpload = Instant.now();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void send() {
        AtomicLong fileCount = new AtomicLong();
        Instant now = Instant.now();
        this.lock.lock();
        try {
            while (!this.payloads.isEmpty()) {
                Queue<FileSegment> segments = this.drainToMaximum();
                Optional.of(segments).map(Queue::peek).map(FileSegment::getOffset).ifPresent(o -> this.transferAsync(fileCount.incrementAndGet(), now, segments, (StorageEventOffset)o));
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void transferAsync(long fileCount, Instant now, Queue<FileSegment> segments, StorageEventOffset o) {
        String fileName = this.fileName(o, now, fileCount);
        if (this.bufferStyle == S3BufferStyle.DISK) {
            this.transferManager.enqueue(new PendingFileUpload(fileName, segments));
        } else {
            this.transferManager.enqueue(new PendingMemoryUpload(fileName, segments));
        }
    }

    private void sendAll() {
        while (!this.payloads.isEmpty()) {
            this.send();
        }
    }

    private Queue<FileSegment> drainToMaximum() {
        LinkedList<FileSegment> current = new LinkedList<FileSegment>();
        do {
            if (this.payloads.isEmpty()) continue;
            current.add(this.payloads.remove());
        } while (!this.payloads.isEmpty() && this.belowMax(current, this.payloads.peek()));
        return current;
    }

    private boolean belowMax(Queue<FileSegment> current, FileSegment next) {
        long currentSize = current.stream().map(FileSegment::getOffset).mapToLong(StorageEventOffset::getGzipSize).sum();
        long nextSize = next.getOffset().getGzipSize();
        long currentRecords = current.stream().map(FileSegment::getOffset).mapToLong(StorageEventOffset::getRecords).sum();
        long nextRecords = next.getOffset().getRecords();
        return currentSize + nextSize < this.maxRecordSize && currentRecords + nextRecords < (long)this.maxBatchRecords;
    }

    private String fileName(StorageEventOffset offset, Instant time, long fileNum) {
        String queue = offset.getEvent();
        String formattedTime = eidFormat.format(time);
        String fileNumPad = this.padWithZeros(fileNum);
        return String.format("bus/%s/z/%s/%d-%s.gz", queue, formattedTime, time.toEpochMilli(), fileNumPad);
    }

    private String padWithZeros(long value) {
        String val = String.valueOf(value);
        return String.format("%0" + (7 - val.length()) + "d%s", 0, val);
    }
}

