/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.TransferStyle;
import io.leoplatform.sdk.aws.WorkQueues;
import io.leoplatform.sdk.payload.EventPayload;
import io.leoplatform.sdk.payload.ThresholdMonitor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class TransferProxy
implements AsyncWorkQueue {
    private static final Logger log = LoggerFactory.getLogger(TransferProxy.class);
    private final WorkQueues workQueues;
    private final ThresholdMonitor thresholdMonitor;
    private final AtomicBoolean failover = new AtomicBoolean(false);

    @Inject
    public TransferProxy(WorkQueues workQueues, ThresholdMonitor thresholdMonitor) {
        this.workQueues = workQueues;
        this.thresholdMonitor = thresholdMonitor;
        if (workQueues.workQueue().style() == workQueues.failoverQueue().style()) {
            thresholdMonitor.end();
        }
    }

    public void addEntity(EventPayload entity) {
        AsyncWorkQueue queue;
        if (this.thresholdMonitor.isFailover()) {
            if (!this.failover.getAndSet(true)) {
                this.flushWorkQueue();
            }
            queue = this.workQueues.failoverQueue();
        } else {
            if (this.failover.getAndSet(false)) {
                this.flushFailoverQueue();
            }
            queue = this.workQueues.workQueue();
        }
        queue.addEntity(entity);
    }

    public void flush() {
        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(this::flushFailoverQueue);
        CompletableFuture<Void> cf2 = CompletableFuture.runAsync(this::flushWorkQueue);
        CompletableFuture.allOf(cf1, cf2).join();
        log.info("Flushed all work queues");
    }

    private void flushWorkQueue() {
        log.info("Flushing Kinesis payloads");
        this.workQueues.workQueue().flush();
    }

    private void flushFailoverQueue() {
        log.info("Flushing S3 uploads");
        this.workQueues.failoverQueue().flush();
    }

    public StreamStats end() {
        log.info("Stopping transfer proxy");
        this.flush();
        this.thresholdMonitor.end();
        return this.workQueues.endAll();
    }

    public TransferStyle style() {
        return TransferStyle.PROXY;
    }
}

