package io.activej.fs.cluster;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.WithChannelInput;
import io.activej.csp.dsl.WithChannelOutputs;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.eventloop.Eventloop;
import io.activej.fs.exception.FsException;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/* loaded from: input_file:io/activej/fs/cluster/ChannelByteSplitter.class */
final class ChannelByteSplitter extends AbstractCommunicatingProcess implements WithChannelInput<ChannelByteSplitter, ByteBuf>, WithChannelOutputs<ByteBuf> {
    private final List<ChannelConsumer<ByteBuf>> outputs = new ArrayList();
    private final int requiredSuccesses;
    private ChannelSupplier<ByteBuf> input;

    private ChannelByteSplitter(int i) {
        this.requiredSuccesses = i;
    }

    public static ChannelByteSplitter create(int i) {
        return new ChannelByteSplitter(i);
    }

    public ChannelInput<ByteBuf> getInput() {
        return channelSupplier -> {
            Checks.checkState(!isProcessStarted(), "Can't configure splitter while it is running");
            this.input = sanitize(channelSupplier);
            tryStart();
            return getProcessCompletion();
        };
    }

    public ChannelOutput<ByteBuf> addOutput() {
        int size = this.outputs.size();
        this.outputs.add(null);
        return channelConsumer -> {
            this.outputs.set(size, channelConsumer);
            tryStart();
        };
    }

    private void tryStart() {
        if (this.input == null || !this.outputs.stream().allMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            return;
        }
        Eventloop.getCurrentEventloop().post(this::startProcess);
    }

    protected void beforeProcess() {
        Checks.checkState(this.input != null, "No splitter input");
        Checks.checkState(!this.outputs.isEmpty(), "No splitter outputs");
    }

    protected void doProcess() {
        if (isProcessComplete()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        this.input.get().whenResult(byteBuf -> {
            if (byteBuf == null) {
                Promises.all(this.outputs.stream().map((v0) -> {
                    return v0.acceptEndOfStream();
                })).whenComplete((r4, exc) -> {
                    completeProcessEx(exc);
                });
            } else {
                Promises.all(this.outputs.stream().map(channelConsumer -> {
                    return channelConsumer.accept(byteBuf.slice()).then((v0) -> {
                        return Promise.of(v0);
                    }, exc2 -> {
                        arrayList.add(channelConsumer);
                        return this.outputs.size() - arrayList.size() < this.requiredSuccesses ? Promise.ofException(exc2) : Promise.complete();
                    });
                })).whenComplete(() -> {
                    this.outputs.removeAll(arrayList);
                }).whenException(exc2 -> {
                    closeEx(new FsException("Not enough successes"));
                }).whenResult(this::doProcess);
                byteBuf.recycle();
            }
        }).whenException(this::closeEx);
    }

    protected void doClose(Exception exc) {
        this.input.closeEx(exc);
        this.outputs.forEach(channelConsumer -> {
            channelConsumer.closeEx(FsPartitions.LOCAL_EXCEPTION);
        });
    }
}
