package io.activej.csp.process;

import io.activej.common.Preconditions;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractChannelTransformer;
import io.activej.promise.Promise;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/activej/csp/process/AbstractChannelTransformer.class */
public abstract class AbstractChannelTransformer<S extends AbstractChannelTransformer<S, I, O>, I, O> extends AbstractCommunicatingProcess implements WithChannelTransformer<S, I, O> {
    protected ChannelSupplier<I> input;
    protected ChannelConsumer<O> output;

    /* JADX INFO: Access modifiers changed from: protected */
    public final Promise<Void> send(O o) {
        return this.output.accept(o);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Promise<Void> sendEndOfStream() {
        return this.output.acceptEndOfStream();
    }

    @NotNull
    protected abstract Promise<Void> onItem(I i);

    protected Promise<Void> onProcessFinish() {
        return sendEndOfStream();
    }

    protected Promise<Void> onProcessStart() {
        return Promise.complete();
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void beforeProcess() {
        Preconditions.checkState(this.input != null, "Input was not set");
        Preconditions.checkState(this.output != null, "Output was not set");
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected void doProcess() {
        onProcessStart().whenComplete((r4, th) -> {
            if (th == null) {
                this.input.streamTo(ChannelConsumer.of(this::onItem)).then(this::onProcessFinish).whenResult(this::completeProcess);
            } else {
                closeEx(th);
            }
        });
    }

    @Override // io.activej.csp.dsl.HasChannelInput
    /* renamed from: getInput */
    public ChannelInput<I> getInput2() {
        return channelSupplier -> {
            this.input = (ChannelSupplier<I>) sanitize(channelSupplier);
            if (this.input != null && this.output != null) {
                startProcess();
            }
            return getProcessCompletion();
        };
    }

    @Override // io.activej.csp.dsl.HasChannelOutput
    public ChannelOutput<O> getOutput() {
        return channelConsumer -> {
            this.output = (ChannelConsumer<O>) sanitize(channelConsumer);
            if (this.input == null || this.output == null) {
                return;
            }
            startProcess();
        };
    }

    @Override // io.activej.csp.process.AbstractCommunicatingProcess
    protected final void doClose(Throwable th) {
        this.input.closeEx(th);
        this.output.closeEx(th);
    }
}
