/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws.kinesis;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.aws.kinesis.KinesisResults;
import io.leoplatform.sdk.config.ConnectorConfig;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class KinesisProducerWriter {
    private static final Logger log = LoggerFactory.getLogger(KinesisProducerWriter.class);
    private final KinesisResults resultsProcessor;
    private final KinesisProducer kinesis;
    private final String stream;
    private final ExecutorManager executorManager;
    private final List<CompletableFuture<Void>> pendingWrites = new LinkedList<CompletableFuture<Void>>();
    private final Lock lock = new ReentrantLock();
    private final Condition asyncUpload = this.lock.newCondition();

    @Inject
    public KinesisProducerWriter(ConnectorConfig config, ExecutorManager executorManager, KinesisResults resultsProcessor) {
        this.stream = config.value("Stream.Name");
        KinesisProducerConfiguration kCfg = new KinesisProducerConfiguration().setCredentialsProvider(this.credentials(config)).setRegion(config.valueOrElse("Region", "us-east-1")).setAggregationEnabled(false).setRecordMaxBufferedTime(200L).setRequestTimeout(60000L).setMaxConnections(48L).setCredentialsRefreshDelay(100L).setMetricsNamespace("LEO Java SDK").setLogLevel("info");
        this.kinesis = new KinesisProducer(kCfg);
        this.executorManager = executorManager;
        this.resultsProcessor = resultsProcessor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(ByteBuffer payload) {
        this.lock.lock();
        try {
            Executor e = this.executorManager.get();
            CompletionStage cf = ((CompletableFuture)CompletableFuture.supplyAsync(() -> this.addRecord(payload), e).thenAcceptAsync(r -> this.resultsProcessor.add((UserRecordResult)r, payload.array().length), e)).thenRunAsync(this::removeCompleted, e);
            this.pendingWrites.add((CompletableFuture<Void>)cf);
        }
        finally {
            this.lock.unlock();
        }
    }

    void flush() {
        this.kinesis.flushSync();
    }

    private UserRecordResult addRecord(ByteBuffer payload) {
        try {
            return (UserRecordResult)this.kinesis.addUserRecord(this.stream, "0", payload).get();
        }
        catch (Exception e) {
            this.resultsProcessor.addFailure(e);
            throw new RuntimeException("Error adding record");
        }
    }

    StreamStats end() {
        this.completePendingTasks();
        try {
            log.info("Flushing Kinesis pipeline");
            this.kinesis.flushSync();
        }
        catch (Exception e) {
            log.warn("Unable to flush kinesis pipeline: {}", (Object)e.getMessage());
        }
        try {
            log.info("Stopping Kinesis writer ({} outstanding)", (Object)this.kinesis.getOutstandingRecordsCount());
            this.kinesis.destroy();
            log.info("Stopped Kinesis writer");
        }
        catch (Exception e) {
            log.warn("Unable to stop Kinesis writer: {}", (Object)e.getMessage());
        }
        return this.getStats();
    }

    private void completePendingTasks() {
        while (!this.pendingWrites.isEmpty()) {
            this.lock.lock();
            try {
                this.asyncUpload.await(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException i) {
                log.warn("Stopped Kinesis upload manager with incomplete pending tasks");
                this.pendingWrites.clear();
            }
            finally {
                this.lock.unlock();
            }
            this.removeCompleted();
        }
        this.lock.lock();
        try {
            long inFlight = this.pendingWrites.parallelStream().map(CompletableFuture::join).count();
            if (inFlight > 0L) {
                log.info("Waited for {} Kinesis upload{} to complete", (Object)inFlight, (Object)(inFlight == 1L ? "" : "s"));
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf(CompletableFuture::isDone);
            this.asyncUpload.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    private AWSCredentialsProvider credentials(ConnectorConfig config) {
        try {
            return Optional.of(config.valueOrElse("AwsProfile", "")).map(String::trim).filter(profile -> !profile.isEmpty()).map(ProfileCredentialsProvider::new).filter(p -> p.getCredentials() != null).map(AWSCredentialsProvider.class::cast).orElse((AWSCredentialsProvider)DefaultAWSCredentialsProviderChain.getInstance());
        }
        catch (Exception e) {
            return DefaultAWSCredentialsProviderChain.getInstance();
        }
    }

    private StreamStats getStats() {
        return new StreamStats(){

            public Long successes() {
                return KinesisProducerWriter.this.resultsProcessor.successes();
            }

            public Long failures() {
                return KinesisProducerWriter.this.resultsProcessor.failures();
            }

            public Duration totalTime() {
                return Duration.between(KinesisProducerWriter.this.resultsProcessor.start(), Instant.now());
            }
        };
    }
}

