package io.activej.datastream.stats;

import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamTransformer;
import io.activej.promise.Promise;
import java.util.Objects;

/* loaded from: input_file:io/activej/datastream/stats/StreamStatsForwarder.class */
public class StreamStatsForwarder<T> implements StreamTransformer<T, T> {
    private final StreamStatsForwarder<T>.Input input = new Input();
    private final StreamStatsForwarder<T>.Output output = new Output();
    private final StreamStats<T> stats;

    /* loaded from: input_file:io/activej/datastream/stats/StreamStatsForwarder$Input.class */
    protected final class Input extends AbstractStreamConsumer<T> {
        protected Input() {
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamStatsForwarder.this.stats.onStarted();
            resume(StreamStatsForwarder.this.output.getDataAcceptor());
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamStatsForwarder.this.stats.onEndOfStream();
            StreamStatsForwarder.this.output.sendEndOfStream();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            StreamStatsForwarder.this.stats.onError(exc);
        }
    }

    /* loaded from: input_file:io/activej/datastream/stats/StreamStatsForwarder$Output.class */
    protected final class Output extends AbstractStreamSupplier<T> {
        protected Output() {
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            StreamStatsForwarder.this.stats.onResume();
            StreamStatsForwarder.this.input.resume(getDataAcceptor());
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamStatsForwarder.this.stats.onSuspend();
            StreamStatsForwarder.this.input.suspend();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onError(Exception exc) {
            StreamStatsForwarder.this.stats.onError(exc);
        }
    }

    private StreamStatsForwarder(StreamStats<T> streamStats) {
        this.stats = streamStats;
        Promise<Void> acknowledgement = this.input.getAcknowledgement();
        StreamStatsForwarder<T>.Output output = this.output;
        Objects.requireNonNull(output);
        acknowledgement.whenException(output::closeEx);
        Promise<Void> acknowledgement2 = this.output.getAcknowledgement();
        StreamStatsForwarder<T>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = acknowledgement2.whenResult(input::acknowledge);
        StreamStatsForwarder<T>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
    }

    public static <T> StreamStatsForwarder<T> create(StreamStats<T> streamStats) {
        return new StreamStatsForwarder<>(streamStats);
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }
}
