package org.reveno.atp.clustering.core;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.reveno.atp.clustering.api.Cluster;
import org.reveno.atp.clustering.api.ClusterBuffer;
import org.reveno.atp.clustering.api.ClusterConnector;
import org.reveno.atp.clustering.api.ClusterEvent;
import org.reveno.atp.clustering.api.ClusterView;
import org.reveno.atp.clustering.api.SyncMode;
import org.reveno.atp.clustering.api.message.Marshaller;
import org.reveno.atp.clustering.api.message.Message;
import org.reveno.atp.clustering.core.api.ClusterExecutor;
import org.reveno.atp.clustering.core.api.ClusterState;
import org.reveno.atp.clustering.core.api.ElectionResult;
import org.reveno.atp.clustering.core.api.MessagesReceiver;
import org.reveno.atp.clustering.core.api.StorageTransferServer;
import org.reveno.atp.clustering.core.components.GroupBarrier;
import org.reveno.atp.clustering.core.components.StorageTransferModelSync;
import org.reveno.atp.clustering.core.messages.ForceElectionProcess;
import org.reveno.atp.clustering.exceptions.FailoverAbortedException;
import org.reveno.atp.commons.NamedThreadFactory;
import org.reveno.atp.core.JournalsManager;
import org.reveno.atp.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/reveno/atp/clustering/core/FailoverExecutor.class */
public class FailoverExecutor {
    private static final int NUM_JOBS_TO_QUEUE = 500;
    protected Cluster cluster;
    protected ClusterBuffer buffer;
    protected RevenoClusterConfiguration config;
    protected JournalsManager journalsManager;
    protected ClusterFailoverManager failoverManager;
    protected StorageTransferServer storageServer;
    protected ClusterExecutor<ElectionResult, Void> leaderElector;
    protected ClusterExecutor<ClusterState, Void> clusterStateCollector;
    protected ClusterExecutor<Boolean, StorageTransferModelSync.TransferContext> modelSynchronizer;
    protected Runnable failoverListener;
    protected Marshaller marshaller;
    protected final ThreadPoolExecutor electorExecutor;
    protected static final Logger LOG = LoggerFactory.getLogger(FailoverExecutor.class);
    protected volatile boolean isStopped = false;
    protected volatile ClusterView lastView = ClusterView.EMPTY_VIEW;
    protected Runnable snapshotMaker = () -> {
    };
    protected Supplier<Long> replayer = () -> {
        return 0L;
    };
    protected Supplier<Long> lastTransactionId = () -> {
        return 0L;
    };

    public void init() {
        Preconditions.checkNotNull(this.leaderElector, "LeaderElector must be provided.");
        Preconditions.checkNotNull(this.clusterStateCollector, "ClusterStateCollector must be provided.");
        Preconditions.checkNotNull(this.modelSynchronizer, "ModelSynchronizer must be provided.");
        this.cluster.listenEvents(this::onClusterEvent);
        this.cluster.marshallWith(this.marshaller);
        this.cluster.gateway().receive(ForceElectionProcess.TYPE, message -> {
            this.electorExecutor.submit(() -> {
                process(true);
            });
        });
        this.buffer.failoverNotifier(clusterEvent -> {
            this.cluster.gateway().send(this.lastView.members(), new ForceElectionProcess(), this.cluster.gateway().oob());
            this.electorExecutor.submit(() -> {
                process(true);
            });
        });
    }

    public void stop() {
        this.isStopped = true;
        this.electorExecutor.shutdown();
        try {
            this.electorExecutor.awaitTermination(1L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void startElectionProcess() {
        onClusterEvent(ClusterEvent.MEMBERSHIP_CHANGED);
    }

    public void leaderElector(ClusterExecutor<ElectionResult, Void> clusterExecutor) {
        this.leaderElector = clusterExecutor;
        if (clusterExecutor instanceof MessagesReceiver) {
            subscribe((MessagesReceiver) clusterExecutor);
        }
    }

    public void clusterStateCollector(ClusterExecutor<ClusterState, Void> clusterExecutor) {
        this.clusterStateCollector = clusterExecutor;
        if (clusterExecutor instanceof MessagesReceiver) {
            subscribe((MessagesReceiver) clusterExecutor);
        }
    }

    public void modelSynchronizer(ClusterExecutor<Boolean, StorageTransferModelSync.TransferContext> clusterExecutor) {
        this.modelSynchronizer = clusterExecutor;
        if (clusterExecutor instanceof MessagesReceiver) {
            subscribe((MessagesReceiver) clusterExecutor);
        }
    }

    public void snapshotMaker(Runnable runnable) {
        this.snapshotMaker = runnable;
    }

    public void replayer(Supplier<Long> supplier) {
        this.replayer = supplier;
    }

    public void lastTransactionId(Supplier<Long> supplier) {
        this.lastTransactionId = supplier;
    }

    public void marshaller(Marshaller marshaller) {
        this.marshaller = marshaller;
    }

    public void failoverListener(Runnable runnable) {
        this.failoverListener = runnable;
    }

    public void subscribe(MessagesReceiver... messagesReceiverArr) {
        for (MessagesReceiver messagesReceiver : messagesReceiverArr) {
            for (Integer num : messagesReceiver.interestedTypes()) {
                if (messagesReceiver.filter().isPresent()) {
                    ClusterConnector gateway = this.cluster.gateway();
                    int intValue = num.intValue();
                    Predicate<Message> predicate = messagesReceiver.filter().get();
                    messagesReceiver.getClass();
                    gateway.receive(intValue, predicate, messagesReceiver::onMessage);
                } else {
                    ClusterConnector gateway2 = this.cluster.gateway();
                    int intValue2 = num.intValue();
                    messagesReceiver.getClass();
                    gateway2.receive(intValue2, messagesReceiver::onMessage);
                }
            }
        }
    }

    public ClusterView lastView() {
        return this.lastView;
    }

    protected void onClusterEvent(ClusterEvent clusterEvent) {
        if (clusterEvent == ClusterEvent.MEMBERSHIP_CHANGED) {
            this.buffer.onView(this.cluster.view());
            this.electorExecutor.submit(() -> {
                process(false);
            });
        }
    }

    protected void process(boolean z) {
        LOG.info("Cluster merge process started (forced: {})", Boolean.valueOf(z));
        try {
            ClusterView view = this.cluster.view();
            if (!z && this.lastView.equals(view)) {
                LOG.info("Cluster is already up-to-date.");
                notifyListener();
                return;
            }
            if (!isQuorum(view)) {
                LOG.info("Failover process end: not a quorum [members: {}]", view.members());
                notifyListener();
                return;
            }
            try {
                long currentTimeMillis = System.currentTimeMillis();
                blockIfMaster();
                waitOnBarrier(view, "start");
                this.buffer.lockIncoming();
                waitOnBarrier(view, "lock-buffer");
                rollAndFixJournals(view);
                ElectionResult leadershipElection = leadershipElection(view);
                waitOnBarrier(view, "election");
                ClusterState clusterStateCollection = clusterStateCollection(view);
                waitOnBarrier(view, "cluster-state");
                this.buffer.unlockIncoming();
                waitOnBarrier(view, "unlock-buffer");
                unblockMasterOrSynchronizeSlave(view, leadershipElection, clusterStateCollection);
                waitOnBarrier(view, "sync", this.config.revenoElectionTimeouts().syncBarrierTimeoutNanos());
                blockIfMaster(() -> {
                    return Boolean.valueOf(!this.config.revenoDataSync().waitAllNodesSync());
                });
                waitOnBarrier(view, "block-if-master");
                replay(clusterStateCollection);
                waitOnBarrier(view, "replay");
                if (!leadershipElection.isMaster) {
                    unblock();
                }
                waitOnBarrier(view, "unblock-slaves");
                if (leadershipElection.isMaster) {
                    unblock();
                }
                waitOnBarrier(view, "unblock-master");
                this.lastView = view;
                makeMasterIfElected(leadershipElection);
                LOG.info("Election Process Time: {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                notifyListener();
            } catch (Throwable th) {
                LOG.error("Leadership election is failed for view: {}, {}", view, th.getMessage());
                blockAndLock();
                this.buffer.erase();
                if (!this.isStopped) {
                    this.replayer.get();
                }
                this.failoverManager.setMaster(false);
                await();
                if (this.isStopped) {
                    return;
                }
                onClusterEvent(ClusterEvent.MEMBERSHIP_CHANGED);
            }
        } catch (Throwable th2) {
            LOG.error("Unexpected process error.", th2);
        }
    }

    protected void makeMasterIfElected(ElectionResult electionResult) {
        this.failoverManager.setMaster(electionResult.isMaster);
    }

    protected void blockAndLock() {
        if (this.failoverManager.isMaster() && !this.failoverManager.isBlocked()) {
            this.failoverManager.block();
        }
        this.buffer.lockIncoming();
    }

    protected void unblock() {
        if (this.failoverManager.isBlocked()) {
            this.failoverManager.unblock();
        }
    }

    protected void replay(ClusterState clusterState) {
        if (clusterState.latestNode.isPresent()) {
            this.replayer.get();
        }
    }

    protected void unblockMasterOrSynchronizeSlave(ClusterView clusterView, ElectionResult electionResult, ClusterState clusterState) {
        if (electionResult.isMaster && !clusterState.latestNode.isPresent() && !this.config.revenoDataSync().waitAllNodesSync()) {
            this.failoverManager.setMaster(true);
            this.failoverManager.unblock();
        }
        if (clusterState.latestNode.isPresent()) {
            this.modelSynchronizer.execute(clusterView, new StorageTransferModelSync.TransferContext(clusterState.currentTransactionId, clusterState.latestNode.get()));
        }
    }

    protected ClusterState clusterStateCollection(ClusterView clusterView) {
        ClusterState execute = this.clusterStateCollector.execute(clusterView);
        if (execute.failed) {
            throw new FailoverAbortedException("Unable to gather cluster state, restarting.");
        }
        return execute;
    }

    protected ElectionResult leadershipElection(ClusterView clusterView) {
        ElectionResult execute = this.leaderElector.execute(clusterView);
        if (execute.failed) {
            throw new FailoverAbortedException("Unable to complete voting, restarting.");
        }
        return execute;
    }

    protected void rollAndFixJournals(ClusterView clusterView) {
        this.buffer.erase();
        if (this.config.revenoDataSync().mode() == SyncMode.SNAPSHOT) {
            this.snapshotMaker.run();
        }
        this.storageServer.fixJournals(clusterView);
        this.journalsManager.roll(this.lastTransactionId.get().longValue());
    }

    protected void blockIfMaster() {
        blockIfMaster(() -> {
            return true;
        });
    }

    protected void blockIfMaster(Supplier<Boolean> supplier) {
        if (this.failoverManager.isMaster() && supplier.get().booleanValue()) {
            this.failoverManager.block();
        }
    }

    private void notifyListener() {
        if (this.failoverListener != null) {
            this.failoverListener.run();
        }
    }

    protected void waitOnBarrier(ClusterView clusterView, String str) {
        waitOnBarrier(clusterView, str, this.config.revenoElectionTimeouts().barrierTimeoutNanos());
    }

    protected void waitOnBarrier(ClusterView clusterView, String str, long j) {
        LOG.debug("Wait on barrier [{}]", str);
        if (!new GroupBarrier(this.cluster, clusterView, str, j).waitOn()) {
            throw new FailoverAbortedException(String.format("Timeout wait on barrier [%s] in view [%s].", str, clusterView));
        }
        LOG.debug("Reached barrier [{}]", str);
    }

    protected boolean isQuorum(ClusterView clusterView) {
        return clusterView.members().size() != 0 && clusterView.members().size() >= this.config.nodesAddresses().size() / 2;
    }

    protected void await() {
        try {
            Thread.sleep(this.config.revenoElectionTimeouts().ackTimeoutNanos() / 1000000);
        } catch (InterruptedException e) {
        }
    }

    public FailoverExecutor(Cluster cluster, JournalsManager journalsManager, ClusterFailoverManager clusterFailoverManager, StorageTransferServer storageTransferServer, RevenoClusterConfiguration revenoClusterConfiguration) {
        this.cluster = cluster;
        this.storageServer = storageTransferServer;
        this.config = revenoClusterConfiguration;
        this.journalsManager = journalsManager;
        this.buffer = clusterFailoverManager.buffer();
        this.failoverManager = clusterFailoverManager;
        this.electorExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new ArrayBlockingQueue(NUM_JOBS_TO_QUEUE), (ThreadFactory) new NamedThreadFactory("fe-" + revenoClusterConfiguration.currentNodeAddress()));
        this.electorExecutor.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
            try {
                this.electorExecutor.getQueue().put(runnable);
            } catch (InterruptedException e) {
            }
        });
    }
}
