/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.TransferStyle;
import io.leoplatform.sdk.config.ConnectorConfig;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class WorkQueues {
    private static final Logger log = LoggerFactory.getLogger(WorkQueues.class);
    private static final TransferStyle failoverStyle = TransferStyle.STORAGE;
    private final TransferStyle configuredStyle;
    private final Map<TransferStyle, AsyncWorkQueue> transferQueue;

    @Inject
    public WorkQueues(ConnectorConfig config, @Named(value="Stream") AsyncWorkQueue kinesisQueue, @Named(value="Storage") AsyncWorkQueue s3Queue) {
        this.configuredStyle = TransferStyle.fromType((String)config.value("Writer"));
        this.transferQueue = Stream.of(kinesisQueue, s3Queue).collect(Collectors.collectingAndThen(Collectors.toMap(AsyncWorkQueue::style, Function.identity()), Collections::unmodifiableMap));
        log.info("AWS {} {} write configured", (Object)this.awsTypeLabel(this.configuredStyle), (Object)this.configuredStyle.style());
        log.info("AWS {} {} write configured for threshold failover", (Object)this.awsTypeLabel(failoverStyle), (Object)failoverStyle.style());
    }

    private String awsTypeLabel(TransferStyle style) {
        switch (style) {
            case STREAM: {
                return "Kinesis";
            }
            case STORAGE: {
                return "S3";
            }
            case BATCH: {
                return "Firehose";
            }
        }
        throw new IllegalArgumentException("Unable to recognize this transfer style: " + this.configuredStyle);
    }

    AsyncWorkQueue workQueue() {
        return this.transferQueue.get(this.configuredStyle);
    }

    AsyncWorkQueue failoverQueue() {
        return this.transferQueue.get(failoverStyle);
    }

    StreamStats endAll() {
        StreamStats storageStats = this.transferQueue.values().stream().filter(q -> q.style() == TransferStyle.STORAGE).map(AsyncWorkQueue::end).findFirst().orElse(this.emptyStats());
        StreamStats streamStats = this.transferQueue.values().stream().filter(q -> q.style() == TransferStyle.STREAM).map(AsyncWorkQueue::end).findFirst().orElse(this.emptyStats());
        return Stream.of(storageStats, streamStats).reduce(this::combineStats).orElse(this.emptyStats());
    }

    private StreamStats combineStats(StreamStats ss1, StreamStats ss2) {
        Long succ = ss1.successes() + ss2.successes();
        Long fails = ss1.failures() + ss2.failures();
        Duration dur = ss1.totalTime().plusMillis(ss2.totalTime().toMillis());
        return this.stats(succ, fails, dur);
    }

    private StreamStats emptyStats() {
        return this.stats(0L, 0L, Duration.ofMillis(0L));
    }

    private StreamStats stats(final Long succ, final Long fails, final Duration dur) {
        return new StreamStats(){

            public Long successes() {
                return succ;
            }

            public Long failures() {
                return fails;
            }

            public Duration totalTime() {
                return dur;
            }
        };
    }
}

