/*
 * Decompiled with CFR 0.152.
 */
package org.rooftop.netx.engine;

import jakarta.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.rooftop.netx.engine.AbstractSagaDispatcher;
import org.rooftop.netx.engine.AbstractSagaRetrySupporter;
import org.rooftop.netx.engine.core.Saga;
import org.rooftop.netx.engine.logging.LoggingSupportsKt;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Metadata(mv={1, 9, 0}, k=1, xi=48, d1={"\u0000J\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0010\b\n\u0000\n\u0002\u0010\t\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\b \u0018\u00002\u00020\u0001B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u00a2\u0006\u0002\u0010\bJ\"\u0010\r\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0010\u0012\u0004\u0012\u00020\u00110\u000f0\u000e2\u0006\u0010\u0002\u001a\u00020\u0003H$J\b\u0010\u0012\u001a\u00020\u0013H\u0002J\b\u0010\u0014\u001a\u00020\u0015H\u0003J\u0006\u0010\u0016\u001a\u00020\u0015R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\t\u001a\u00020\nX\u0082.\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0012\u0010\u000b\u001a\u0006\u0012\u0002\b\u00030\fX\u0082.\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u0017"}, d2={"Lorg/rooftop/netx/engine/AbstractSagaRetrySupporter;", "", "backpressureSize", "", "recoveryMilli", "", "sagaDispatcher", "Lorg/rooftop/netx/engine/AbstractSagaDispatcher;", "(IJLorg/rooftop/netx/engine/AbstractSagaDispatcher;)V", "executor", "Ljava/util/concurrent/ScheduledExecutorService;", "scheduledFuture", "Ljava/util/concurrent/ScheduledFuture;", "claimOrphanSaga", "Lreactor/core/publisher/Flux;", "Lkotlin/Pair;", "Lorg/rooftop/netx/engine/core/Saga;", "", "handleOrphanSaga", "Ljava/lang/Runnable;", "shutdownGracefully", "", "watchOrphanSaga", "netx"})
public abstract class AbstractSagaRetrySupporter {
    private final int backpressureSize;
    private final long recoveryMilli;
    @NotNull
    private final AbstractSagaDispatcher sagaDispatcher;
    private ScheduledExecutorService executor;
    private ScheduledFuture<?> scheduledFuture;

    public AbstractSagaRetrySupporter(int backpressureSize, long recoveryMilli, @NotNull AbstractSagaDispatcher sagaDispatcher) {
        Intrinsics.checkNotNullParameter((Object)sagaDispatcher, (String)"sagaDispatcher");
        this.backpressureSize = backpressureSize;
        this.recoveryMilli = recoveryMilli;
        this.sagaDispatcher = sagaDispatcher;
    }

    public final void watchOrphanSaga() {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        Intrinsics.checkNotNullExpressionValue((Object)scheduledExecutorService, (String)"newSingleThreadScheduledExecutor(...)");
        this.executor = scheduledExecutorService;
        ScheduledExecutorService scheduledExecutorService2 = this.executor;
        if (scheduledExecutorService2 == null) {
            Intrinsics.throwUninitializedPropertyAccessException((String)"executor");
            scheduledExecutorService2 = null;
        }
        ScheduledFuture<?> scheduledFuture = scheduledExecutorService2.scheduleWithFixedDelay(this.handleOrphanSaga(), 0L, this.recoveryMilli, TimeUnit.MILLISECONDS);
        Intrinsics.checkNotNullExpressionValue(scheduledFuture, (String)"scheduleWithFixedDelay(...)");
        this.scheduledFuture = scheduledFuture;
    }

    private final Runnable handleOrphanSaga() {
        return () -> AbstractSagaRetrySupporter.handleOrphanSaga$lambda$3(this);
    }

    @NotNull
    protected abstract Flux<Pair<Saga, String>> claimOrphanSaga(int var1);

    @PreDestroy
    private final void shutdownGracefully() {
        Object $this$shutdownGracefully_u24lambda_u244;
        ScheduledFuture<?> scheduledFuture = this.scheduledFuture;
        if (scheduledFuture == null) {
            Intrinsics.throwUninitializedPropertyAccessException((String)"scheduledFuture");
            scheduledFuture = null;
        }
        scheduledFuture.cancel(true);
        ScheduledExecutorService scheduledExecutorService = this.executor;
        if (scheduledExecutorService == null) {
            Intrinsics.throwUninitializedPropertyAccessException((String)"executor");
            scheduledExecutorService = null;
        }
        scheduledExecutorService.shutdown();
        Object object = this;
        try {
            $this$shutdownGracefully_u24lambda_u244 = object;
            boolean bl = false;
            ScheduledExecutorService scheduledExecutorService2 = ((AbstractSagaRetrySupporter)$this$shutdownGracefully_u24lambda_u244).executor;
            if (scheduledExecutorService2 == null) {
                Intrinsics.throwUninitializedPropertyAccessException((String)"executor");
                scheduledExecutorService2 = null;
            }
            if (!scheduledExecutorService2.awaitTermination(10L, TimeUnit.MINUTES)) {
                ScheduledExecutorService scheduledExecutorService3 = ((AbstractSagaRetrySupporter)$this$shutdownGracefully_u24lambda_u244).executor;
                if (scheduledExecutorService3 == null) {
                    Intrinsics.throwUninitializedPropertyAccessException((String)"executor");
                    scheduledExecutorService3 = null;
                }
                scheduledExecutorService3.shutdownNow();
                ScheduledExecutorService scheduledExecutorService4 = ((AbstractSagaRetrySupporter)$this$shutdownGracefully_u24lambda_u244).executor;
                if (scheduledExecutorService4 == null) {
                    Intrinsics.throwUninitializedPropertyAccessException((String)"executor");
                    scheduledExecutorService4 = null;
                }
                if (!scheduledExecutorService4.awaitTermination(1L, TimeUnit.MINUTES)) {
                    LoggingSupportsKt.error("Cannot shutdown SagaRetrySupporter thread");
                }
            }
            $this$shutdownGracefully_u24lambda_u244 = Result.constructor-impl((Object)Unit.INSTANCE);
        }
        catch (Throwable bl) {
            $this$shutdownGracefully_u24lambda_u244 = Result.constructor-impl((Object)ResultKt.createFailure((Throwable)bl));
        }
        object = $this$shutdownGracefully_u24lambda_u244;
        Throwable throwable = Result.exceptionOrNull-impl((Object)object);
        if (throwable != null) {
            Object it = $this$shutdownGracefully_u24lambda_u244 = throwable;
            boolean bl = false;
            ScheduledExecutorService scheduledExecutorService5 = this.executor;
            if (scheduledExecutorService5 == null) {
                Intrinsics.throwUninitializedPropertyAccessException((String)"executor");
                scheduledExecutorService5 = null;
            }
            scheduledExecutorService5.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (Result.isSuccess-impl((Object)object)) {
            Unit it = (Unit)object;
            boolean bl = false;
            LoggingSupportsKt.info("Shutdown SagaRetrySupporter gracefully");
        }
    }

    private static final void handleOrphanSaga$lambda$3$lambda$0(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        $tmp0.invoke(p0);
    }

    private static final Publisher handleOrphanSaga$lambda$3$lambda$1(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final Publisher handleOrphanSaga$lambda$3$lambda$2(Function1 $tmp0, Object p0) {
        Intrinsics.checkNotNullParameter((Object)$tmp0, (String)"$tmp0");
        return (Publisher)$tmp0.invoke(p0);
    }

    private static final void handleOrphanSaga$lambda$3(AbstractSagaRetrySupporter this$0) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        this$0.claimOrphanSaga(this$0.backpressureSize).doOnNext(arg_0 -> AbstractSagaRetrySupporter.handleOrphanSaga$lambda$3$lambda$0(handleOrphanSaga.1.1.INSTANCE, arg_0)).flatMap(arg_0 -> AbstractSagaRetrySupporter.handleOrphanSaga$lambda$3$lambda$1((Function1)new Function1<Pair<? extends Saga, ? extends String>, Publisher<? extends String>>(this$0){
            final /* synthetic */ AbstractSagaRetrySupporter this$0;
            {
                this.this$0 = $receiver;
                super(1);
            }

            public final Publisher<? extends String> invoke(Pair<Saga, String> pair) {
                Saga saga2 = (Saga)pair.component1();
                String messageId = (String)pair.component2();
                return (Publisher)LoggingSupportsKt.warningOnError(AbstractSagaRetrySupporter.access$getSagaDispatcher$p(this.this$0).dispatch(saga2, messageId), "Error occurred when retry orphan saga \"" + saga2.getId() + "\"");
            }
        }, arg_0)).onErrorResume(arg_0 -> AbstractSagaRetrySupporter.handleOrphanSaga$lambda$3$lambda$2(handleOrphanSaga.1.3.INSTANCE, arg_0)).subscribeOn(Schedulers.immediate()).subscribe();
    }

    public static final /* synthetic */ AbstractSagaDispatcher access$getSagaDispatcher$p(AbstractSagaRetrySupporter $this) {
        return $this.sagaDispatcher;
    }
}

