package io.rsocket.aeron.client;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Subscription;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.aeron.AeronDuplexConnection;
import io.rsocket.aeron.frames.AeronPayloadFlyweight;
import io.rsocket.aeron.frames.FrameType;
import io.rsocket.aeron.frames.SetupCompleteFlyweight;
import io.rsocket.aeron.frames.SetupFlyweight;
import io.rsocket.aeron.reactor.AeronPublicationSubscriber;
import io.rsocket.aeron.reactor.AeronSubscriptionFlux;
import io.rsocket.transport.ClientTransport;
import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.core.publisher.WorkQueueProcessor;

/* loaded from: input_file:io/rsocket/aeron/client/AeronClientTransport.class */
public class AeronClientTransport implements ClientTransport {
    private static final Logger logger = LoggerFactory.getLogger(AeronClientTransport.class);
    private static final AtomicInteger STREAM = new AtomicInteger(2);
    private final WorkQueueProcessor<Runnable> workQueueProcessor;
    private final Aeron aeron;
    private final String aeronURL;
    private final ByteBufAllocator allocator;

    public AeronClientTransport(WorkQueueProcessor<Runnable> workQueueProcessor, Aeron aeron, String str, ByteBufAllocator byteBufAllocator) {
        this.workQueueProcessor = workQueueProcessor;
        this.allocator = byteBufAllocator;
        this.aeron = aeron;
        this.aeronURL = str;
    }

    public Mono<DuplexConnection> connect() {
        long nextLong = ThreadLocalRandom.current().nextLong();
        int andAdd = STREAM.getAndAdd(2);
        MonoProcessor create = MonoProcessor.create();
        Subscription addSubscription = this.aeron.addSubscription(this.aeronURL, andAdd, image -> {
            create.onComplete();
        }, image2 -> {
        });
        return Mono.defer(() -> {
            logger.info("creating new connection with connection id {} and aeron stream id {} to url {}", new Object[]{Long.valueOf(nextLong), Integer.valueOf(andAdd), this.aeronURL});
            MonoProcessor create2 = MonoProcessor.create();
            ConcurrentPublication addPublication = this.aeron.addPublication(this.aeronURL, -1);
            Subscription addSubscription2 = this.aeron.addSubscription(this.aeronURL, -2);
            AeronSubscriptionFlux.create("clientManagementStream", this.workQueueProcessor, addSubscription2, this.allocator).filter(byteBuf -> {
                return AeronPayloadFlyweight.frameType(byteBuf) == FrameType.SETUP_COMPLETE && SetupCompleteFlyweight.connectionId(byteBuf) == nextLong;
            }).flatMap(byteBuf2 -> {
                logger.info("received setup complete for connection id {} and aeron stream id {} to url {}", new Object[]{Long.valueOf(nextLong), Integer.valueOf(andAdd), this.aeronURL});
                return Mono.create(monoSink -> {
                    String aeronUrl = SetupCompleteFlyweight.aeronUrl(byteBuf2);
                    int streamId = SetupCompleteFlyweight.streamId(byteBuf2);
                    logger.info("connection with id {} creating publication to server with aeron stream id {} to url {}", new Object[]{Long.valueOf(nextLong), Integer.valueOf(streamId), aeronUrl});
                    monoSink.success(new AeronDuplexConnection("client", this.workQueueProcessor, this.aeron.addPublication(aeronUrl, streamId), addSubscription, this.allocator));
                });
            }).doFinally(signalType -> {
                addSubscription2.close();
            }).subscribe(create2);
            return Mono.just(SetupFlyweight.encode(this.allocator, nextLong, andAdd, this.aeronURL)).transform(Operators.lift((scannable, coreSubscriber) -> {
                return AeronPublicationSubscriber.create("serverManagementPublication", this.workQueueProcessor, coreSubscriber, addPublication);
            })).doFinally(signalType2 -> {
                addPublication.close();
            }).then(create).then(create2).doOnSuccess(duplexConnection -> {
                logger.info("successfully created new connection with connection id {} and aeron stream id {} to url {}", new Object[]{Long.valueOf(nextLong), Integer.valueOf(andAdd), this.aeronURL});
            }).timeout(Duration.ofSeconds(1000L)).onErrorMap(th -> {
                addSubscription2.close();
                logger.error("error trying to make a connection to " + this.aeronURL, th);
                return th instanceof TimeoutException ? new TimeoutException("timed out waiting for connection to server " + this.aeronURL) : th;
            });
        });
    }
}
