package org.rooftop.netx.engine;

import jakarta.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.rooftop.netx.engine.core.Saga;
import org.rooftop.netx.engine.logging.LoggingSupportsKt;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* compiled from: AbstractSagaRetrySupporter.kt */
@Metadata(mv = {1, 9, 0}, k = 1, xi = 48, d1 = {"��J\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\b\n��\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\b \u0018��2\u00020\u0001B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007¢\u0006\u0002\u0010\bJ\"\u0010\r\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0010\u0012\u0004\u0012\u00020\u00110\u000f0\u000e2\u0006\u0010\u0002\u001a\u00020\u0003H$J\b\u0010\u0012\u001a\u00020\u0013H\u0002J\b\u0010\u0014\u001a\u00020\u0015H\u0003J\u0006\u0010\u0016\u001a\u00020\u0015R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082.¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u0012\u0010\u000b\u001a\u0006\u0012\u0002\b\u00030\fX\u0082.¢\u0006\u0002\n��¨\u0006\u0017"}, d2 = {"Lorg/rooftop/netx/engine/AbstractSagaRetrySupporter;", "", "backpressureSize", "", "recoveryMilli", "", "sagaDispatcher", "Lorg/rooftop/netx/engine/AbstractSagaDispatcher;", "(IJLorg/rooftop/netx/engine/AbstractSagaDispatcher;)V", "executor", "Ljava/util/concurrent/ScheduledExecutorService;", "scheduledFuture", "Ljava/util/concurrent/ScheduledFuture;", "claimOrphanSaga", "Lreactor/core/publisher/Flux;", "Lkotlin/Pair;", "Lorg/rooftop/netx/engine/core/Saga;", "", "handleOrphanSaga", "Ljava/lang/Runnable;", "shutdownGracefully", "", "watchOrphanSaga", "netx"})
/* loaded from: input_file:org/rooftop/netx/engine/AbstractSagaRetrySupporter.class */
public abstract class AbstractSagaRetrySupporter {
    private final int backpressureSize;
    private final long recoveryMilli;

    @NotNull
    private final AbstractSagaDispatcher sagaDispatcher;
    private ScheduledExecutorService executor;
    private ScheduledFuture<?> scheduledFuture;

    public AbstractSagaRetrySupporter(int i, long j, @NotNull AbstractSagaDispatcher abstractSagaDispatcher) {
        Intrinsics.checkNotNullParameter(abstractSagaDispatcher, "sagaDispatcher");
        this.backpressureSize = i;
        this.recoveryMilli = j;
        this.sagaDispatcher = abstractSagaDispatcher;
    }

    public final void watchOrphanSaga() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        Intrinsics.checkNotNullExpressionValue(newSingleThreadScheduledExecutor, "newSingleThreadScheduledExecutor(...)");
        this.executor = newSingleThreadScheduledExecutor;
        ScheduledExecutorService scheduledExecutorService = this.executor;
        if (scheduledExecutorService == null) {
            Intrinsics.throwUninitializedPropertyAccessException("executor");
            scheduledExecutorService = null;
        }
        ScheduledFuture<?> scheduleWithFixedDelay = scheduledExecutorService.scheduleWithFixedDelay(handleOrphanSaga(), 0L, this.recoveryMilli, TimeUnit.MILLISECONDS);
        Intrinsics.checkNotNullExpressionValue(scheduleWithFixedDelay, "scheduleWithFixedDelay(...)");
        this.scheduledFuture = scheduleWithFixedDelay;
    }

    private final Runnable handleOrphanSaga() {
        return () -> {
            handleOrphanSaga$lambda$3(r0);
        };
    }

    @NotNull
    protected abstract Flux<Pair<Saga, String>> claimOrphanSaga(int i);

    @PreDestroy
    private final void shutdownGracefully() {
        Object obj;
        ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
        if (scheduledFuture == null) {
            Intrinsics.throwUninitializedPropertyAccessException("scheduledFuture");
            scheduledFuture = null;
        }
        scheduledFuture.cancel(true);
        ScheduledExecutorService scheduledExecutorService = this.executor;
        if (scheduledExecutorService == null) {
            Intrinsics.throwUninitializedPropertyAccessException("executor");
            scheduledExecutorService = null;
        }
        scheduledExecutorService.shutdown();
        try {
            Result.Companion companion = Result.Companion;
            AbstractSagaRetrySupporter abstractSagaRetrySupporter = this;
            ScheduledExecutorService scheduledExecutorService2 = abstractSagaRetrySupporter.executor;
            if (scheduledExecutorService2 == null) {
                Intrinsics.throwUninitializedPropertyAccessException("executor");
                scheduledExecutorService2 = null;
            }
            if (!scheduledExecutorService2.awaitTermination(10L, TimeUnit.MINUTES)) {
                ScheduledExecutorService scheduledExecutorService3 = abstractSagaRetrySupporter.executor;
                if (scheduledExecutorService3 == null) {
                    Intrinsics.throwUninitializedPropertyAccessException("executor");
                    scheduledExecutorService3 = null;
                }
                scheduledExecutorService3.shutdownNow();
                ScheduledExecutorService scheduledExecutorService4 = abstractSagaRetrySupporter.executor;
                if (scheduledExecutorService4 == null) {
                    Intrinsics.throwUninitializedPropertyAccessException("executor");
                    scheduledExecutorService4 = null;
                }
                if (!scheduledExecutorService4.awaitTermination(1L, TimeUnit.MINUTES)) {
                    LoggingSupportsKt.error("Cannot shutdown SagaRetrySupporter thread");
                }
            }
            obj = Result.constructor-impl(Unit.INSTANCE);
        } catch (Throwable th) {
            Result.Companion companion2 = Result.Companion;
            obj = Result.constructor-impl(ResultKt.createFailure(th));
        }
        Object obj2 = obj;
        if (Result.exceptionOrNull-impl(obj2) != null) {
            ScheduledExecutorService scheduledExecutorService5 = this.executor;
            if (scheduledExecutorService5 == null) {
                Intrinsics.throwUninitializedPropertyAccessException("executor");
                scheduledExecutorService5 = null;
            }
            scheduledExecutorService5.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (Result.isSuccess-impl(obj2)) {
            LoggingSupportsKt.info("Shutdown SagaRetrySupporter gracefully");
        }
    }

    private static final void handleOrphanSaga$lambda$3$lambda$0(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        function1.invoke(obj);
    }

    private static final Publisher handleOrphanSaga$lambda$3$lambda$1(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final Publisher handleOrphanSaga$lambda$3$lambda$2(Function1 function1, Object obj) {
        Intrinsics.checkNotNullParameter(function1, "$tmp0");
        return (Publisher) function1.invoke(obj);
    }

    private static final void handleOrphanSaga$lambda$3(final AbstractSagaRetrySupporter abstractSagaRetrySupporter) {
        Intrinsics.checkNotNullParameter(abstractSagaRetrySupporter, "this$0");
        Flux<Pair<Saga, String>> claimOrphanSaga = abstractSagaRetrySupporter.claimOrphanSaga(abstractSagaRetrySupporter.backpressureSize);
        AbstractSagaRetrySupporter$handleOrphanSaga$1$1 abstractSagaRetrySupporter$handleOrphanSaga$1$1 = new Function1<Pair<? extends Saga, ? extends String>, Unit>() { // from class: org.rooftop.netx.engine.AbstractSagaRetrySupporter$handleOrphanSaga$1$1
            public final void invoke(Pair<Saga, String> pair) {
                LoggingSupportsKt.info("Retry orphan saga " + pair.getFirst() + "\nmessageId \"" + pair.getSecond() + "\"");
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((Pair<Saga, String>) obj);
                return Unit.INSTANCE;
            }
        };
        Flux doOnNext = claimOrphanSaga.doOnNext((v1) -> {
            handleOrphanSaga$lambda$3$lambda$0(r1, v1);
        });
        Function1<Pair<? extends Saga, ? extends String>, Publisher<? extends String>> function1 = new Function1<Pair<? extends Saga, ? extends String>, Publisher<? extends String>>() { // from class: org.rooftop.netx.engine.AbstractSagaRetrySupporter$handleOrphanSaga$1$2
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final Publisher<? extends String> invoke(Pair<Saga, String> pair) {
                AbstractSagaDispatcher abstractSagaDispatcher;
                Saga saga = (Saga) pair.component1();
                String str = (String) pair.component2();
                abstractSagaDispatcher = AbstractSagaRetrySupporter.this.sagaDispatcher;
                return LoggingSupportsKt.warningOnError(abstractSagaDispatcher.dispatch(saga, str), "Error occurred when retry orphan saga \"" + saga.getId() + "\"");
            }
        };
        Flux flatMap = doOnNext.flatMap((v1) -> {
            return handleOrphanSaga$lambda$3$lambda$1(r1, v1);
        });
        AbstractSagaRetrySupporter$handleOrphanSaga$1$3 abstractSagaRetrySupporter$handleOrphanSaga$1$3 = new Function1<Throwable, Publisher<? extends String>>() { // from class: org.rooftop.netx.engine.AbstractSagaRetrySupporter$handleOrphanSaga$1$3
            public final Publisher<? extends String> invoke(Throwable th) {
                return Mono.empty();
            }
        };
        flatMap.onErrorResume((v1) -> {
            return handleOrphanSaga$lambda$3$lambda$2(r1, v1);
        }).subscribeOn(Schedulers.immediate()).subscribe();
    }
}
