/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.aws;

import io.leoplatform.sdk.AsyncWorkQueue;
import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.LoadingStream;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.payload.EventPayload;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class AWSLoadingStream
implements LoadingStream {
    private static final Logger log = LoggerFactory.getLogger(AWSLoadingStream.class);
    private final AsyncWorkQueue transferProxy;
    private final ExecutorManager executorManager;
    private final AtomicBoolean streaming;

    @Inject
    public AWSLoadingStream(@Named(value="Proxy") AsyncWorkQueue transferProxy, ExecutorManager executorManager) {
        this.transferProxy = transferProxy;
        this.executorManager = executorManager;
        this.streaming = new AtomicBoolean(true);
    }

    public void load(EventPayload payload) {
        if (this.streaming.get()) {
            EventPayload sanitized = Optional.ofNullable(payload).filter(p -> Objects.nonNull(p.payload())).orElseThrow(() -> new IllegalArgumentException("Invalid payload: " + payload));
            this.transferProxy.addEntity(sanitized);
        } else {
            log.warn("Attempt to load payload on a closed stream");
        }
    }

    public void load(Stream<EventPayload> payload) {
        payload.forEachOrdered(this::load);
    }

    public CompletableFuture<StreamStats> end() {
        if (this.streaming.getAndSet(false)) {
            log.info("Stopping platform stream");
            return CompletableFuture.supplyAsync(() -> {
                StreamStats stats = this.transferProxy.end();
                this.executorManager.end();
                return stats;
            });
        }
        return this.noStats();
    }

    private CompletableFuture<StreamStats> noStats() {
        return CompletableFuture.completedFuture(new StreamStats(){

            public Long successes() {
                return 0L;
            }

            public Long failures() {
                return 0L;
            }

            public Duration totalTime() {
                Instant now = Instant.now();
                return Duration.between(now, now);
            }
        });
    }
}

