package reactor.aeron;

import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import java.io.File;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:reactor/aeron/AeronResources.class */
public class AeronResources implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronResources.class);
    private final AeronResourcesConfig config;
    private final MonoProcessor<Void> start = MonoProcessor.create();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private Aeron aeron;
    private MediaDriver mediaDriver;
    private AeronEventLoopGroup eventLoopGroup;

    private AeronResources(AeronResourcesConfig aeronResourcesConfig) {
        this.config = aeronResourcesConfig;
        this.start.then(doStart()).subscribe((Consumer) null, th -> {
            logger.error("{} failed to start, cause: {}", this, th.toString());
            dispose();
        });
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th2 -> {
            logger.warn("{} failed on doDispose(): {}", this, th2.toString());
        }, () -> {
            logger.debug("Disposed {}", this);
        });
    }

    public static AeronResources start() {
        return start(AeronResourcesConfig.defaultConfig());
    }

    public static AeronResources start(AeronResourcesConfig aeronResourcesConfig) {
        AeronResources aeronResources = new AeronResources(aeronResourcesConfig);
        aeronResources.start0();
        return aeronResources;
    }

    private void start0() {
        this.start.onComplete();
    }

    private Mono<Void> doStart() {
        return Mono.fromRunnable(() -> {
            this.mediaDriver = MediaDriver.launchEmbedded(new MediaDriver.Context().aeronDirectoryName(this.config.aeronDirectoryName()).mtuLength(this.config.mtuLength()).imageLivenessTimeoutNs(this.config.imageLivenessTimeout().toNanos()).dirDeleteOnStart(this.config.isDirDeleteOnStart()));
            Aeron.Context context = new Aeron.Context();
            context.aeronDirectoryName(this.mediaDriver.aeronDirectoryName());
            this.aeron = Aeron.connect(context);
            this.eventLoopGroup = new AeronEventLoopGroup("reactor-aeron", this.config.numOfWorkers(), this.config.idleStrategySupplier());
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                deleteAeronDirectory(context.aeronDirectory());
            }));
            logger.debug("{} has initialized embedded media driver, aeron directory: {}", this, context.aeronDirectoryName());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<DefaultAeronInbound> inbound(Image image, MessageSubscription messageSubscription) {
        return Mono.defer(() -> {
            AeronEventLoop next = this.eventLoopGroup.next();
            return next.registerInbound(new DefaultAeronInbound(image, next, messageSubscription)).doOnError(th -> {
                logger.error("{} failed on registerInbound(), cause: {}", this, th.toString());
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessagePublication> publication(String str, int i, AeronOptions aeronOptions) {
        return Mono.defer(() -> {
            return aeronPublication(str, i).subscribeOn(Schedulers.parallel()).doOnError(th -> {
                logger.error("{} failed on aeronPublication(), channel: {}, cause: {}", new Object[]{this, str, th.toString()});
            }).flatMap(publication -> {
                AeronEventLoop next = this.eventLoopGroup.next();
                return next.registerPublication(new MessagePublication(publication, aeronOptions, next)).doOnError(th2 -> {
                    logger.error("{} failed on registerPublication(), cause: {}", this, th2.toString());
                    if (publication.isClosed()) {
                        return;
                    }
                    publication.close();
                });
            });
        });
    }

    private Mono<Publication> aeronPublication(String str, int i) {
        return Mono.fromCallable(() -> {
            logger.debug("Adding aeron.Publication for channel {}", str);
            long nanoTime = System.nanoTime();
            ExclusivePublication addExclusivePublication = this.aeron.addExclusivePublication(str, i);
            logger.debug("Added aeron.Publication for channel {}, spent: {} ns", str, Long.valueOf(Duration.ofNanos(System.nanoTime() - nanoTime).toNanos()));
            return addExclusivePublication;
        });
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessageSubscription> subscription(String str, int i, Consumer<Image> consumer, Consumer<Image> consumer2) {
        return Mono.defer(() -> {
            return aeronSubscription(str, i, consumer, consumer2).subscribeOn(Schedulers.parallel()).doOnError(th -> {
                logger.error("{} failed on aeronSubscription(), channel: {}, cause: {}", new Object[]{this, str, th.toString()});
            }).flatMap(subscription -> {
                AeronEventLoop next = this.eventLoopGroup.next();
                return next.registerSubscription(new MessageSubscription(subscription, next)).doOnError(th2 -> {
                    logger.error("{} failed on registerSubscription(), cause: {}", this, th2.toString());
                    if (subscription.isClosed()) {
                        return;
                    }
                    subscription.close();
                });
            });
        });
    }

    private Mono<Subscription> aeronSubscription(String str, int i, Consumer<Image> consumer, Consumer<Image> consumer2) {
        return Mono.fromCallable(() -> {
            logger.debug("Adding aeron.Subscription for channel {}", str);
            long nanoTime = System.nanoTime();
            Subscription addSubscription = this.aeron.addSubscription(str, i, image -> {
                logger.debug("{} onImageAvailable: {} {}", new Object[]{this, Integer.toHexString(image.sessionId()), image.sourceIdentity()});
                Optional.ofNullable(consumer).ifPresent(consumer3 -> {
                    consumer3.accept(image);
                });
            }, image2 -> {
                logger.debug("{} onImageUnavailable: {} {}", new Object[]{this, Integer.toHexString(image2.sessionId()), image2.sourceIdentity()});
                Optional.ofNullable(consumer2).ifPresent(consumer3 -> {
                    consumer3.accept(image2);
                });
            });
            logger.debug("Added aeron.Subscription for channel {}, spent: {} ns", str, Long.valueOf(Duration.ofNanos(System.nanoTime() - nanoTime).toNanos()));
            return addSubscription;
        });
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            logger.debug("Disposing {}", this);
            AeronEventLoopGroup aeronEventLoopGroup = this.eventLoopGroup;
            aeronEventLoopGroup.getClass();
            return Mono.fromRunnable(aeronEventLoopGroup::dispose).then(this.eventLoopGroup.onDispose()).doFinally(signalType -> {
                CloseHelper.quietClose(this.aeron);
                CloseHelper.quietClose(this.mediaDriver);
                Optional.ofNullable(this.mediaDriver).map((v0) -> {
                    return v0.context();
                }).ifPresent(context -> {
                    IoUtil.delete(context.aeronDirectory(), true);
                });
            });
        });
    }

    private void deleteAeronDirectory(File file) {
        if (file.exists()) {
            IoUtil.delete(file, true);
            logger.debug("{} deleted aeron directory {}", this, file);
        }
    }

    public String toString() {
        return "AeronResources0x" + Integer.toHexString(System.identityHashCode(this));
    }
}
