/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.coordinator;

import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.id.ActorId;
import io.ray.runtime.exception.RayException;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.coordinator.BaseCoordinator;
import io.ray.streaming.runtime.master.coordinator.command.BaseWorkerCmd;
import io.ray.streaming.runtime.master.coordinator.command.WorkerCommitReport;
import io.ray.streaming.runtime.rpc.RemoteCallWorker;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointCoordinator
extends BaseCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private final Set<ActorId> pendingCheckpointActors = new HashSet<ActorId>();
    private final Set<Long> interruptedCheckpointSet = new HashSet<Long>();
    private final int cpIntervalSecs;
    private final int cpTimeoutSecs;

    public CheckpointCoordinator(JobMaster jobMaster) {
        super(jobMaster);
        this.cpIntervalSecs = this.runtimeContext.getConf().masterConfig.checkpointConfig.cpIntervalSecs();
        this.cpTimeoutSecs = this.runtimeContext.getConf().masterConfig.checkpointConfig.cpTimeoutSecs();
        this.runtimeContext.lastCpTimestamp = System.currentTimeMillis();
    }

    @Override
    public void run() {
        while (!this.closed) {
            try {
                BaseWorkerCmd command = this.runtimeContext.cpCmds.poll(1L, TimeUnit.SECONDS);
                if (command != null) {
                    if (command instanceof WorkerCommitReport) {
                        this.processCommitReport((WorkerCommitReport)command);
                    } else {
                        this.interruptCheckpoint();
                    }
                }
                if (!this.pendingCheckpointActors.isEmpty()) {
                    if (!this.timeoutOnWaitCheckpoint()) continue;
                    LOG.warn("Waiting for checkpoint {} timeout, pending cp actors is {}.", (Object)this.runtimeContext.lastCheckpointId, this.graphManager.getExecutionGraph().getActorName(this.pendingCheckpointActors));
                    this.interruptCheckpoint();
                    continue;
                }
                this.maybeTriggerCheckpoint();
            }
            catch (Throwable e) {
                LOG.error("Checkpoint coordinator occur err.", e);
                try {
                    this.interruptCheckpoint();
                }
                catch (Throwable interruptE) {
                    LOG.error("Ignore interrupt checkpoint exception in catch block.");
                }
            }
        }
        LOG.warn("Checkpoint coordinator thread exit.");
    }

    public Boolean reportJobWorkerCommit(WorkerCommitReport report) {
        LOG.info("Report job worker commit {}.", (Object)report);
        Boolean ret = this.runtimeContext.cpCmds.offer(report);
        if (!ret.booleanValue()) {
            LOG.warn("Report job worker commit failed, because command queue is full.");
        }
        return ret;
    }

    private void processCommitReport(WorkerCommitReport commitReport) {
        LOG.info("Start process commit report {}, from actor name={}.", (Object)commitReport, (Object)this.graphManager.getExecutionGraph().getActorName(commitReport.fromActorId));
        try {
            Preconditions.checkArgument((commitReport.commitCheckpointId == this.runtimeContext.lastCheckpointId ? 1 : 0) != 0, (String)"expect checkpointId %s, but got %s", (long)this.runtimeContext.lastCheckpointId, (Object)commitReport);
            if (!this.pendingCheckpointActors.contains(commitReport.fromActorId)) {
                LOG.warn("Invalid commit report, skipped.");
                return;
            }
            this.pendingCheckpointActors.remove(commitReport.fromActorId);
            LOG.info("Pending actors after this commit: {}.", this.graphManager.getExecutionGraph().getActorName(this.pendingCheckpointActors));
            if (this.pendingCheckpointActors.isEmpty()) {
                this.runtimeContext.checkpointIds.add(this.runtimeContext.lastCheckpointId);
                if (this.clearExpiredCpStateAndQueueMsg()) {
                    this.jobMaster.saveContext();
                    LOG.info("Finish checkpoint: {}.", (Object)this.runtimeContext.lastCheckpointId);
                } else {
                    LOG.warn("Fail to do checkpoint: {}.", (Object)this.runtimeContext.lastCheckpointId);
                }
            }
            LOG.info("Process commit report {} success.", (Object)commitReport);
        }
        catch (Throwable e) {
            LOG.warn("Process commit report has exception.", e);
        }
    }

    private void triggerCheckpoint() {
        this.interruptedCheckpointSet.clear();
        if (LOG.isInfoEnabled()) {
            LOG.info("Start trigger checkpoint {}.", (Object)(this.runtimeContext.lastCheckpointId + 1L));
        }
        List<ActorId> allIds = this.graphManager.getExecutionGraph().getAllActorsId();
        this.pendingCheckpointActors.addAll(allIds);
        ++this.runtimeContext.lastCheckpointId;
        ArrayList sourcesRet = new ArrayList();
        this.graphManager.getExecutionGraph().getSourceActors().forEach(actor -> sourcesRet.add(RemoteCallWorker.triggerCheckpoint(actor, this.runtimeContext.lastCheckpointId)));
        for (ObjectRef rayObject : sourcesRet) {
            if (!(rayObject.get() instanceof RayException)) continue;
            LOG.warn("Trigger checkpoint has exception.", (Throwable)((RayException)rayObject.get()));
            throw (RayException)rayObject.get();
        }
        this.runtimeContext.lastCpTimestamp = System.currentTimeMillis();
        LOG.info("Trigger checkpoint success.");
    }

    private void interruptCheckpoint() {
        if (this.interruptedCheckpointSet.contains(this.runtimeContext.lastCheckpointId)) {
            LOG.warn("Skip interrupt duplicated checkpoint id : {}.", (Object)this.runtimeContext.lastCheckpointId);
            return;
        }
        this.interruptedCheckpointSet.add(this.runtimeContext.lastCheckpointId);
        LOG.warn("Interrupt checkpoint, checkpoint id : {}.", (Object)this.runtimeContext.lastCheckpointId);
        List<BaseActorHandle> allActor = this.graphManager.getExecutionGraph().getAllActors();
        if (this.runtimeContext.lastCheckpointId > this.runtimeContext.getLastValidCheckpointId()) {
            RemoteCallWorker.notifyCheckpointTimeoutParallel(allActor, this.runtimeContext.lastCheckpointId);
        }
        if (!this.pendingCheckpointActors.isEmpty()) {
            this.pendingCheckpointActors.clear();
        }
        this.maybeTriggerCheckpoint();
    }

    private void maybeTriggerCheckpoint() {
        if (this.readyToTrigger()) {
            this.triggerCheckpoint();
        }
    }

    private boolean clearExpiredCpStateAndQueueMsg() {
        List<BaseActorHandle> allActor = this.graphManager.getExecutionGraph().getAllActors();
        if (1 == this.runtimeContext.checkpointIds.size()) {
            Long msgExpiredCheckpointId = this.runtimeContext.checkpointIds.get(0);
            RemoteCallWorker.clearExpiredCheckpointParallel(allActor, 0L, msgExpiredCheckpointId);
        }
        if (this.runtimeContext.checkpointIds.size() > 1) {
            Long stateExpiredCpId = this.runtimeContext.checkpointIds.remove(0);
            Long msgExpiredCheckpointId = this.runtimeContext.checkpointIds.get(0);
            RemoteCallWorker.clearExpiredCheckpointParallel(allActor, stateExpiredCpId, msgExpiredCheckpointId);
        }
        return true;
    }

    private boolean readyToTrigger() {
        return System.currentTimeMillis() - this.runtimeContext.lastCpTimestamp >= (long)(this.cpIntervalSecs * 1000);
    }

    private boolean timeoutOnWaitCheckpoint() {
        return System.currentTimeMillis() - this.runtimeContext.lastCpTimestamp >= (long)(this.cpTimeoutSecs * 1000);
    }
}

