package reactor.aeron;

import io.aeron.Image;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:reactor/aeron/AeronClientConnector.class */
final class AeronClientConnector {
    private static final Logger logger = LoggerFactory.getLogger(AeronClientConnector.class);
    private static final int STREAM_ID = -889323520;
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super AeronConnection, ? extends Publisher<Void>> handler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronClientConnector(AeronOptions aeronOptions) {
        this.options = aeronOptions;
        this.resources = aeronOptions.resources();
        this.handler = aeronOptions.handler();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<AeronConnection> start() {
        return Mono.defer(() -> {
            return this.resources.publication(this.options.outboundUri().asString(), STREAM_ID, this.options).flatMap((v0) -> {
                return v0.ensureConnected();
            }).flatMap(messagePublication -> {
                int sessionId = messagePublication.sessionId();
                String asString = this.options.inboundUri().sessionId(Integer.valueOf(sessionId)).asString();
                logger.debug("{}: creating client connection: {}", Integer.toHexString(sessionId), asString);
                MonoProcessor create = MonoProcessor.create();
                MonoProcessor create2 = MonoProcessor.create();
                return this.resources.subscription(asString, STREAM_ID, image -> {
                    logger.debug("{}: created client inbound", Integer.toHexString(sessionId));
                    create2.onNext(image);
                }, image2 -> {
                    logger.debug("{}: client inbound became unavaliable", Integer.toHexString(sessionId));
                    create.onComplete();
                }).doOnError(th -> {
                    logger.warn("{}: failed to create client inbound, cause: {}", Integer.toHexString(sessionId), th.toString());
                    messagePublication.dispose();
                }).flatMap(messageSubscription -> {
                    return create2.flatMap(image3 -> {
                        return newConnection(sessionId, image3, messagePublication, messageSubscription, create);
                    });
                }).doOnSuccess(aeronConnection -> {
                    logger.debug("{}: created client connection: {}", Integer.toHexString(sessionId), asString);
                });
            });
        });
    }

    private Mono<AeronConnection> newConnection(int i, Image image, MessagePublication messagePublication, MessageSubscription messageSubscription, MonoProcessor<Void> monoProcessor) {
        return this.resources.inbound(image, messageSubscription).doOnError(th -> {
            messageSubscription.dispose();
            messagePublication.dispose();
        }).flatMap(defaultAeronInbound -> {
            DuplexAeronConnection duplexAeronConnection = new DuplexAeronConnection(i, defaultAeronInbound, new DefaultAeronOutbound(messagePublication), monoProcessor);
            return duplexAeronConnection.start(this.handler).doOnError(th2 -> {
                duplexAeronConnection.dispose();
            });
        });
    }
}
