package reactor.aeron;

import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/DuplexAeronConnection.class */
public final class DuplexAeronConnection implements AeronConnection {
    private final int sessionId;
    private final DefaultAeronInbound inbound;
    private final DefaultAeronOutbound outbound;
    private final Logger logger = LoggerFactory.getLogger(DuplexAeronConnection.class);
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DuplexAeronConnection(int i, DefaultAeronInbound defaultAeronInbound, DefaultAeronOutbound defaultAeronOutbound, MonoProcessor<Void> monoProcessor) {
        this.sessionId = i;
        this.inbound = defaultAeronInbound;
        this.outbound = defaultAeronOutbound;
        this.dispose.or(monoProcessor).then(doDispose()).doFinally(signalType -> {
            this.logger.debug("{}: connection disposed", Integer.toHexString(i));
        }).doFinally(signalType2 -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            this.logger.warn("{} failed on doDispose(): {}", this, th.toString());
        }, () -> {
            this.logger.debug("Disposed {}", this);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<AeronConnection> start(Function<? super AeronConnection, ? extends Publisher<Void>> function) {
        return Mono.fromRunnable(() -> {
            start0(function);
        }).thenReturn(this);
    }

    private void start0(Function<? super AeronConnection, ? extends Publisher<Void>> function) {
        if (function == null) {
            this.logger.warn("{}: connection handler function is not specified", Integer.toHexString(this.sessionId));
        } else {
            if (isDisposed()) {
                return;
            }
            function.apply(this).subscribe(disposeSubscriber());
        }
    }

    @Override // reactor.aeron.AeronConnection
    public AeronInbound inbound() {
        return this.inbound;
    }

    @Override // reactor.aeron.AeronConnection
    public AeronOutbound outbound() {
        return this.outbound;
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            this.logger.debug("Disposing {}", this);
            DefaultAeronInbound defaultAeronInbound = this.inbound;
            defaultAeronInbound.getClass();
            DefaultAeronOutbound defaultAeronOutbound = this.outbound;
            defaultAeronOutbound.getClass();
            return Mono.whenDelayError(new Publisher[]{Mono.fromRunnable(defaultAeronInbound::dispose), Mono.fromRunnable(defaultAeronOutbound::dispose)});
        });
    }

    public String toString() {
        return "DefaultAeronConnection0x" + Integer.toHexString(this.sessionId);
    }
}
