package io.activej.datastream.processor;

import io.activej.common.Checks;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.dsl.HasStreamInputs;
import io.activej.datastream.dsl.HasStreamOutput;
import io.activej.datastream.processor.StreamReducers;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/datastream/processor/StreamReducer.class */
public final class StreamReducer<K, O, A> implements HasStreamInputs, HasStreamOutput<O> {
    public static final int DEFAULT_BUFFER_SIZE = 2000;

    @Nullable
    private StreamReducer<K, O, A>.Input<?> lastInput;

    @Nullable
    private A accumulator;
    private final PriorityQueue<StreamReducer<K, O, A>.Input<?>> priorityQueue;
    private int streamsAwaiting;
    private int streamsOpen;
    private final List<Input> inputs = new ArrayList();
    private int bufferSize = DEFAULT_BUFFER_SIZE;

    @Nullable
    private K key = null;
    private final StreamReducer<K, O, A>.Output output = new Output();

    /* loaded from: input_file:io/activej/datastream/processor/StreamReducer$Input.class */
    public abstract class Input<I> extends AbstractStreamConsumer<I> implements StreamDataAcceptor<I>, Function<I, K>, StreamReducers.Reducer<K, I, O, A> {
        private I headItem;
        private K headKey;
        private final int index;
        private final PriorityQueue<StreamReducer<K, O, A>.Input<?>> priorityQueue;
        private final ArrayDeque<I> deque = new ArrayDeque<>();
        private final int bufferSize;

        public Input() {
            this.index = StreamReducer.this.inputs.size();
            this.priorityQueue = StreamReducer.this.priorityQueue;
            this.bufferSize = StreamReducer.this.bufferSize;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            resume(this);
        }

        @Override // io.activej.datastream.StreamDataAcceptor
        public void accept(I i) {
            if (this.headItem != null) {
                this.deque.offer(i);
                if (this.deque.size() == this.bufferSize) {
                    suspend();
                    StreamReducer.this.output.reduce();
                    return;
                }
                return;
            }
            this.headItem = i;
            this.headKey = apply(this.headItem);
            this.priorityQueue.offer(this);
            if (advance() == 0) {
                StreamReducer.this.output.reduce();
            }
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            closeInput();
            if (this.headItem == null) {
                advance();
            }
            StreamReducer.this.output.reduce();
            StreamReducer.this.output.getAcknowledgement().whenComplete(this::acknowledge).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            StreamReducer.this.output.closeEx(exc);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onCleanup() {
            this.deque.clear();
        }

        protected int await() {
            return StreamReducer.access$504(StreamReducer.this);
        }

        protected int advance() {
            return StreamReducer.access$506(StreamReducer.this);
        }

        protected void closeInput() {
            StreamReducer.access$610(StreamReducer.this);
        }

        protected void continueReduce() {
            StreamReducer.this.output.reduce();
        }
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamReducer$Output.class */
    private final class Output extends AbstractStreamSupplier<O> {
        private Output() {
        }

        void reduce() {
            resume();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            Input input;
            while (true) {
                if (StreamReducer.this.streamsAwaiting != 0 || (input = (Input) StreamReducer.this.priorityQueue.poll()) == 0) {
                    break;
                }
                if (!input.isComplete()) {
                    if (StreamReducer.this.key == null || !input.headKey.equals(StreamReducer.this.key)) {
                        if (StreamReducer.this.lastInput != null) {
                            StreamReducer.this.lastInput.onComplete(getBufferedDataAcceptor(), StreamReducer.this.key, StreamReducer.this.accumulator);
                        }
                        StreamReducer.this.key = input.headKey;
                        StreamReducer.this.accumulator = input.onFirstItem(getBufferedDataAcceptor(), StreamReducer.this.key, input.headItem);
                    } else {
                        StreamReducer.this.accumulator = input.onNextItem(getBufferedDataAcceptor(), StreamReducer.this.key, input.headItem, StreamReducer.this.accumulator);
                    }
                    input.headItem = input.deque.poll();
                    StreamReducer.this.lastInput = input;
                    if (input.headItem == null) {
                        if (!input.isEndOfStream()) {
                            input.await();
                            break;
                        }
                    } else {
                        input.headKey = input.apply(input.headItem);
                        StreamReducer.this.priorityQueue.offer(input);
                    }
                }
            }
            for (Input input2 : StreamReducer.this.inputs) {
                if (input2.deque.size() <= StreamReducer.this.bufferSize / 2) {
                    input2.resume(input2);
                }
            }
            if (StreamReducer.this.streamsOpen == 0 && StreamReducer.this.priorityQueue.isEmpty()) {
                if (StreamReducer.this.lastInput != null) {
                    StreamReducer.this.lastInput.onComplete(getBufferedDataAcceptor(), StreamReducer.this.key, StreamReducer.this.accumulator);
                    StreamReducer.this.lastInput = null;
                    StreamReducer.this.key = null;
                    StreamReducer.this.accumulator = null;
                }
                StreamReducer.this.output.sendEndOfStream();
            }
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onError(Exception exc) {
            Iterator it = StreamReducer.this.inputs.iterator();
            while (it.hasNext()) {
                ((Input) it.next()).closeEx(exc);
            }
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onCleanup() {
            StreamReducer.this.priorityQueue.clear();
        }
    }

    /* loaded from: input_file:io/activej/datastream/processor/StreamReducer$SimpleInput.class */
    public class SimpleInput<I> extends StreamReducer<K, O, A>.Input<I> {
        private final Function<I, K> keyFunction;
        private final StreamReducers.Reducer<K, I, O, A> reducer;

        public SimpleInput(Function<I, K> function, StreamReducers.Reducer<K, I, O, A> reducer) {
            super();
            this.keyFunction = function;
            this.reducer = reducer;
        }

        @Override // java.util.function.Function
        public K apply(I i) {
            return this.keyFunction.apply(i);
        }

        @Override // io.activej.datastream.processor.StreamReducers.Reducer
        public A onFirstItem(StreamDataAcceptor<O> streamDataAcceptor, K k, I i) {
            return this.reducer.onFirstItem(streamDataAcceptor, k, i);
        }

        @Override // io.activej.datastream.processor.StreamReducers.Reducer
        public A onNextItem(StreamDataAcceptor<O> streamDataAcceptor, K k, I i, A a) {
            return this.reducer.onNextItem(streamDataAcceptor, k, i, a);
        }

        @Override // io.activej.datastream.processor.StreamReducers.Reducer
        public void onComplete(StreamDataAcceptor<O> streamDataAcceptor, K k, A a) {
            this.reducer.onComplete(streamDataAcceptor, k, a);
        }
    }

    private StreamReducer(@NotNull PriorityQueue<StreamReducer<K, O, A>.Input<?>> priorityQueue) {
        this.priorityQueue = priorityQueue;
    }

    public static <K, O, A> StreamReducer<K, O, A> create(Comparator<K> comparator) {
        return new StreamReducer<>(new PriorityQueue(1, (input, input2) -> {
            int compare = comparator.compare(input.headKey, input2.headKey);
            return compare != 0 ? compare : input.index - input2.index;
        }));
    }

    public static <K extends Comparable<K>, O, A> StreamReducer<K, O, A> create() {
        return new StreamReducer<>(new PriorityQueue(1, (input, input2) -> {
            int compareTo = ((Comparable) input.headKey).compareTo((Comparable) input2.headKey);
            return compareTo != 0 ? compareTo : input.index - input2.index;
        }));
    }

    public StreamReducer<K, O, A> withBufferSize(int i) {
        Checks.checkArgument(i >= 0, "bufferSize must be positive value, got %s", new Object[]{Integer.valueOf(i)});
        this.bufferSize = i;
        return this;
    }

    public <I> StreamConsumer<I> newInput(Function<I, K> function, StreamReducers.Reducer<K, I, O, A> reducer) {
        return addInput(new SimpleInput(function, reducer));
    }

    public <I> StreamReducer<K, O, A>.Input<I> addInput(StreamReducer<K, O, A>.Input<I> input) {
        this.inputs.add(input);
        input.await();
        this.streamsOpen++;
        return input;
    }

    @Override // io.activej.datastream.dsl.HasStreamInputs
    public List<? extends StreamConsumer<?>> getInputs() {
        return this.inputs;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<O> getOutput() {
        return this.output;
    }

    static /* synthetic */ int access$504(StreamReducer streamReducer) {
        int i = streamReducer.streamsAwaiting + 1;
        streamReducer.streamsAwaiting = i;
        return i;
    }

    static /* synthetic */ int access$506(StreamReducer streamReducer) {
        int i = streamReducer.streamsAwaiting - 1;
        streamReducer.streamsAwaiting = i;
        return i;
    }

    static /* synthetic */ int access$610(StreamReducer streamReducer) {
        int i = streamReducer.streamsOpen;
        streamReducer.streamsOpen = i - 1;
        return i;
    }
}
