package io.activej.datastream;

import io.activej.common.Utils;
import io.activej.common.function.ConsumerEx;
import io.activej.csp.ChannelConsumer;
import io.activej.eventloop.Eventloop;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collector;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/datastream/StreamConsumers.class */
final class StreamConsumers {

    /* loaded from: input_file:io/activej/datastream/StreamConsumers$ClosingWithError.class */
    static final class ClosingWithError<T> extends AbstractStreamConsumer<T> {
        private Exception error;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ClosingWithError(Exception exc) {
            this.error = exc;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onInit() {
            this.error = (Exception) Utils.nullify(this.error, this::closeEx);
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamConsumers$Idle.class */
    static final class Idle<T> extends AbstractStreamConsumer<T> {
        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            acknowledge();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamConsumers$OfAnotherEventloop.class */
    public static final class OfAnotherEventloop<T> extends AbstractStreamConsumer<T> {
        private static final int MAX_BUFFER_SIZE = 100;
        private static final Iterator<?> END_OF_STREAM = Collections.emptyIterator();
        private List<T> list = new ArrayList();
        private final StreamDataAcceptor<T> toList = obj -> {
            this.list.add(obj);
            if (this.list.size() == MAX_BUFFER_SIZE) {
                flush();
                suspend();
            }
        };
        private final StreamConsumer<T> anotherEventloopConsumer;
        private final OfAnotherEventloop<T>.InternalSupplier internalSupplier;
        private volatile boolean wakingUp;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/activej/datastream/StreamConsumers$OfAnotherEventloop$InternalSupplier.class */
        public final class InternalSupplier extends AbstractStreamSupplier<T> {
            volatile Iterator<T> iterator;
            volatile boolean isReady;
            volatile boolean wakingUp;

            InternalSupplier() {
            }

            void execute(Runnable runnable) {
                this.eventloop.execute(runnable);
            }

            void wakeUp() {
                if (this.wakingUp) {
                    return;
                }
                this.wakingUp = true;
                execute(this::onWakeUp);
            }

            void onWakeUp() {
                if (isComplete()) {
                    return;
                }
                this.wakingUp = false;
                flush();
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onInit() {
                this.eventloop.startExternalTask();
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onResumed() {
                this.isReady = true;
                flush();
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onSuspended() {
                this.isReady = false;
                OfAnotherEventloop.this.wakeUp();
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onComplete() {
                this.eventloop.completeExternalTask();
            }

            private void flush() {
                if (this.iterator == null) {
                    OfAnotherEventloop.this.wakeUp();
                    return;
                }
                Iterator<T> it = this.iterator;
                while (isReady() && it.hasNext()) {
                    send(it.next());
                }
                if (it == OfAnotherEventloop.END_OF_STREAM) {
                    sendEndOfStream();
                } else {
                    if (this.iterator.hasNext()) {
                        return;
                    }
                    this.iterator = null;
                    OfAnotherEventloop.this.wakeUp();
                }
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onAcknowledge() {
                OfAnotherEventloop ofAnotherEventloop = OfAnotherEventloop.this;
                OfAnotherEventloop ofAnotherEventloop2 = OfAnotherEventloop.this;
                ofAnotherEventloop.execute(ofAnotherEventloop2::acknowledge);
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onError(Exception exc) {
                OfAnotherEventloop.this.execute(() -> {
                    OfAnotherEventloop.this.closeEx(exc);
                });
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onCleanup() {
                this.iterator = null;
            }
        }

        public OfAnotherEventloop(@NotNull Eventloop eventloop, @NotNull StreamConsumer<T> streamConsumer) {
            this.anotherEventloopConsumer = streamConsumer;
            this.internalSupplier = (InternalSupplier) Eventloop.initWithEventloop(eventloop, () -> {
                return new InternalSupplier();
            });
        }

        void execute(Runnable runnable) {
            this.eventloop.execute(runnable);
        }

        void wakeUp() {
            if (this.wakingUp) {
                return;
            }
            this.wakingUp = true;
            execute(this::onWakeUp);
        }

        void onWakeUp() {
            if (isComplete()) {
                return;
            }
            this.wakingUp = false;
            flush();
            if (!this.internalSupplier.isReady) {
                suspend();
            } else {
                resume(this.toList);
                this.internalSupplier.wakeUp();
            }
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onInit() {
            this.eventloop.startExternalTask();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            this.internalSupplier.execute(() -> {
                this.internalSupplier.streamTo(this.anotherEventloopConsumer);
            });
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            flush();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onComplete() {
            this.eventloop.completeExternalTask();
        }

        private void flush() {
            if (this.internalSupplier.iterator != null) {
                return;
            }
            if (isEndOfStream() && this.list.isEmpty()) {
                this.internalSupplier.iterator = (Iterator<T>) END_OF_STREAM;
            } else {
                if (this.list.isEmpty()) {
                    return;
                }
                this.internalSupplier.iterator = this.list.iterator();
                this.list = new ArrayList();
            }
            this.internalSupplier.wakeUp();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            this.internalSupplier.execute(() -> {
                this.internalSupplier.closeEx(exc);
            });
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onCleanup() {
            this.list = null;
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamConsumers$OfChannelConsumer.class */
    static final class OfChannelConsumer<T> extends AbstractStreamConsumer<T> {
        private final ChannelConsumer<T> consumer;
        private boolean working;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OfChannelConsumer(ChannelConsumer<T> channelConsumer) {
            this.consumer = channelConsumer;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            flush();
        }

        private void flush() {
            resume(obj -> {
                Promise whenException = this.consumer.accept(obj).whenException(this::closeEx);
                if (whenException.isComplete()) {
                    return;
                }
                suspend();
                this.working = true;
                whenException.whenResult(() -> {
                    this.working = false;
                    if (isEndOfStream()) {
                        sendEndOfStream();
                    } else {
                        flush();
                    }
                });
            });
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            if (this.working) {
                return;
            }
            sendEndOfStream();
        }

        private void sendEndOfStream() {
            this.consumer.acceptEndOfStream().whenResult(this::acknowledge).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            this.consumer.closeEx(exc);
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamConsumers$OfConsumer.class */
    static final class OfConsumer<T> extends AbstractStreamConsumer<T> {
        private final ConsumerEx<T> consumer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OfConsumer(ConsumerEx<T> consumerEx) {
            this.consumer = consumerEx;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            resume(obj -> {
                try {
                    this.consumer.accept(obj);
                } catch (RuntimeException e) {
                    throw e;
                } catch (Exception e2) {
                    closeEx(e2);
                }
            });
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            acknowledge();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamConsumers$OfPromise.class */
    public static final class OfPromise<T> extends AbstractStreamConsumer<T> {
        private Promise<? extends StreamConsumer<T>> promise;
        private final OfPromise<T>.InternalSupplier internalSupplier = new InternalSupplier();

        /* loaded from: input_file:io/activej/datastream/StreamConsumers$OfPromise$InternalSupplier.class */
        private class InternalSupplier extends AbstractStreamSupplier<T> {
            private InternalSupplier() {
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onResumed() {
                OfPromise.this.resume(getDataAcceptor());
            }

            @Override // io.activej.datastream.AbstractStreamSupplier
            protected void onSuspended() {
                OfPromise.this.suspend();
            }
        }

        public OfPromise(@NotNull Promise<? extends StreamConsumer<T>> promise) {
            this.promise = promise;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onInit() {
            this.promise.whenResult(streamConsumer -> {
                streamConsumer.getAcknowledgement().whenResult(this::acknowledge).whenException(this::closeEx);
                getAcknowledgement().whenException(exc -> {
                    streamConsumer.closeEx(exc);
                });
                this.internalSupplier.streamTo(streamConsumer);
            }).whenException(this::closeEx);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            this.internalSupplier.sendEndOfStream();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onCleanup() {
            this.promise = null;
        }
    }

    /* loaded from: input_file:io/activej/datastream/StreamConsumers$Skip.class */
    static final class Skip<T> extends AbstractStreamConsumer<T> {
        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            resume(obj -> {
            });
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            acknowledge();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/datastream/StreamConsumers$ToCollector.class */
    public static final class ToCollector<T, A, R> extends AbstractStreamConsumer<T> {
        private final SettablePromise<R> resultPromise = new SettablePromise<>();
        private final Collector<T, A, R> collector;
        private A accumulator;

        public ToCollector(Collector<T, A, R> collector) {
            this.collector = collector;
        }

        public Promise<R> getResult() {
            return this.resultPromise;
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onInit() {
            this.resultPromise.whenComplete(this::acknowledge);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            A a = this.collector.supplier().get();
            this.accumulator = a;
            BiConsumer<A, T> accumulator = this.collector.accumulator();
            resume(obj -> {
                accumulator.accept(a, obj);
            });
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            this.resultPromise.set(this.collector.finisher().apply(this.accumulator));
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onError(Exception exc) {
            this.resultPromise.setException(exc);
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onCleanup() {
            this.accumulator = null;
        }
    }

    StreamConsumers() {
    }
}
