/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.master.coordinator;

import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.context.JobMasterRuntimeContext;
import io.ray.streaming.runtime.master.coordinator.BaseCoordinator;
import io.ray.streaming.runtime.master.coordinator.command.BaseWorkerCmd;
import io.ray.streaming.runtime.master.coordinator.command.InterruptCheckpointRequest;
import io.ray.streaming.runtime.master.coordinator.command.WorkerRollbackRequest;
import io.ray.streaming.runtime.rpc.async.AsyncRemoteCaller;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.util.ResourceUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.map.DefaultedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverCoordinator
extends BaseCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverCoordinator.class);
    private static final int ROLLBACK_RETRY_TIME_MS = 10000;
    private final Object cmdLock = new Object();
    private final AsyncRemoteCaller asyncRemoteCaller;
    private long currentCascadingGroupId = 0L;
    private final Map<ExecutionVertex, Boolean> isRollbacking = DefaultedMap.decorate(new ConcurrentHashMap(), (Object)false);

    public FailoverCoordinator(JobMaster jobMaster, boolean isRecover) {
        this(jobMaster, new AsyncRemoteCaller(), isRecover);
    }

    public FailoverCoordinator(JobMaster jobMaster, AsyncRemoteCaller asyncRemoteCaller, boolean isRecover) {
        super(jobMaster);
        this.asyncRemoteCaller = asyncRemoteCaller;
        JobMasterRuntimeContext runtimeContext = jobMaster.getRuntimeContext();
        if (isRecover) {
            runtimeContext.foCmds.addAll(runtimeContext.unfinishedFoCmds);
        }
        runtimeContext.unfinishedFoCmds.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (!this.closed) {
            try {
                BaseWorkerCmd command;
                Object object = this.cmdLock;
                synchronized (object) {
                    command = this.jobMaster.getRuntimeContext().foCmds.poll(1L, TimeUnit.SECONDS);
                }
                if (null == command || !(command instanceof WorkerRollbackRequest)) continue;
                this.jobMaster.getRuntimeContext().unfinishedFoCmds.add(command);
                this.dealWithRollbackRequest((WorkerRollbackRequest)command);
            }
            catch (Throwable e) {
                LOG.error("Fo coordinator occur err.", e);
            }
        }
        LOG.warn("Fo coordinator thread exit.");
    }

    private Boolean isDuplicateRequest(WorkerRollbackRequest request) {
        try {
            Object[] foCmdsArray;
            for (Object cmd : foCmdsArray = this.runtimeContext.foCmds.toArray()) {
                if (!request.fromActorId.equals((Object)((BaseWorkerCmd)cmd).fromActorId)) continue;
                return true;
            }
        }
        catch (Exception e) {
            LOG.warn("Check request is duplicated failed.", (Throwable)e);
        }
        return false;
    }

    public Boolean requestJobWorkerRollback(WorkerRollbackRequest request) {
        LOG.info("Request job worker rollback {}.", (Object)request);
        if (this.isDuplicateRequest(request).booleanValue()) {
            LOG.warn("Skip duplicated worker rollback request, {}.", (Object)request.toString());
            return true;
        }
        boolean ret = this.runtimeContext.foCmds.offer(request);
        this.jobMaster.saveContext();
        if (!ret) {
            LOG.warn("Request job worker rollback failed, because command queue is full.");
        }
        return ret;
    }

    private void dealWithRollbackRequest(WorkerRollbackRequest rollbackRequest) {
        LOG.info("Start deal with rollback request {}.", (Object)rollbackRequest);
        ExecutionVertex exeVertex = this.getExeVertexFromRequest(rollbackRequest);
        if (null != rollbackRequest.getPid() && !rollbackRequest.getPid().equals(WorkerRollbackRequest.DEFAULT_PID)) {
            exeVertex.setPid(rollbackRequest.getPid());
        }
        if (this.isRollbacking.get(exeVertex).booleanValue()) {
            LOG.info("Vertex {} is rollbacking, skip rollback again.", (Object)exeVertex);
            return;
        }
        String hostname = "";
        Optional<Container> container = ResourceUtil.getContainerById(this.jobMaster.getResourceManager().getRegisteredContainers(), exeVertex.getContainerId());
        if (container.isPresent()) {
            hostname = container.get().getHostname();
        }
        if (rollbackRequest.isForcedRollback) {
            this.interruptCheckpointAndRollback(rollbackRequest);
        } else {
            this.asyncRemoteCaller.checkIfNeedRollbackAsync(exeVertex.getWorkerActor(), res -> {
                if (!res.booleanValue()) {
                    LOG.info("Vertex {} doesn't need to rollback, skip it.", (Object)exeVertex);
                    return;
                }
                this.interruptCheckpointAndRollback(rollbackRequest);
            }, throwable -> LOG.error("Exception when calling checkIfNeedRollbackAsync, maybe vertex is dead, ignore this request, vertex={}.", (Object)exeVertex, throwable));
        }
        LOG.info("Deal with rollback request {} success.", (Object)rollbackRequest);
    }

    private void interruptCheckpointAndRollback(WorkerRollbackRequest rollbackRequest) {
        if (rollbackRequest.cascadingGroupId == null) {
            rollbackRequest.cascadingGroupId = this.currentCascadingGroupId++;
        }
        this.rollback(this.jobMaster.getRuntimeContext().getLastValidCheckpointId(), rollbackRequest, this.currentCascadingGroupId);
        this.jobMaster.getRuntimeContext().cpCmds.offer(new InterruptCheckpointRequest());
    }

    private void rollback(long checkpointId, WorkerRollbackRequest rollbackRequest, long cascadingGroupId) {
        ExecutionVertex exeVertex = this.getExeVertexFromRequest(rollbackRequest);
        LOG.info("Call vertex {} to rollback, checkpoint id is {}, cascadingGroupId={}.", new Object[]{exeVertex, checkpointId, cascadingGroupId});
        this.isRollbacking.put(exeVertex, true);
        this.asyncRemoteCaller.rollback(exeVertex.getWorkerActor(), checkpointId, result -> {
            List<WorkerRollbackRequest> newRollbackRequests = new ArrayList<WorkerRollbackRequest>();
            switch (result.getResultEnum()) {
                case SUCCESS: {
                    ChannelRecoverInfo recoverInfo = (ChannelRecoverInfo)result.getResultObj();
                    LOG.info("Vertex {} rollback done, dataLostQueues={}, msg={}, cascadingGroupId={}.", new Object[]{exeVertex, recoverInfo.getDataLostQueues(), result.getResultMsg(), cascadingGroupId});
                    newRollbackRequests = this.cascadeUpstreamActors(recoverInfo.getDataLostQueues(), exeVertex, cascadingGroupId);
                    break;
                }
                case SKIPPED: {
                    LOG.info("Vertex skip rollback, result = {}, cascadingGroupId={}.", result, (Object)cascadingGroupId);
                    break;
                }
                default: {
                    LOG.error("Rollback vertex {} failed, result={}, cascadingGroupId={}, rollback this worker again after {} ms.", new Object[]{exeVertex, result, cascadingGroupId, 10000});
                    Thread.sleep(10000L);
                    LOG.info("Add rollback request for {} again, cascadingGroupId={}.", (Object)exeVertex, (Object)cascadingGroupId);
                    newRollbackRequests.add(new WorkerRollbackRequest(exeVertex, "", "Rollback failed, try again.", false));
                }
            }
            Object object = this.cmdLock;
            synchronized (object) {
                this.jobMaster.getRuntimeContext().foCmds.addAll(newRollbackRequests);
                this.jobMaster.getRuntimeContext().unfinishedFoCmds.remove(rollbackRequest);
                this.jobMaster.saveContext();
            }
            this.isRollbacking.put(exeVertex, false);
        }, throwable -> {
            LOG.error("Exception when calling vertex to rollback, vertex={}.", (Object)exeVertex, throwable);
            this.isRollbacking.put(exeVertex, false);
        });
        LOG.info("Finish rollback vertex {}, checkpoint id is {}.", (Object)exeVertex, (Object)checkpointId);
    }

    private List<WorkerRollbackRequest> cascadeUpstreamActors(Set<String> dataLostQueues, ExecutionVertex fromVertex, long cascadingGroupId) {
        ArrayList<WorkerRollbackRequest> cascadedRollbackRequest = new ArrayList<WorkerRollbackRequest>();
        dataLostQueues.forEach(q -> {
            BaseActorHandle upstreamActor = this.graphManager.getExecutionGraph().getPeerActor(fromVertex.getWorkerActor(), (String)q);
            ExecutionVertex upstreamExeVertex = this.getExecutionVertex(upstreamActor);
            if (this.isRollbacking.get(upstreamExeVertex).booleanValue()) {
                return;
            }
            LOG.info("Call upstream vertex {} of vertex {} to rollback, cascadingGroupId={}.", new Object[]{upstreamExeVertex, fromVertex, cascadingGroupId});
            String hostname = "";
            Optional<Container> container = ResourceUtil.getContainerById(this.jobMaster.getResourceManager().getRegisteredContainers(), upstreamExeVertex.getContainerId());
            if (container.isPresent()) {
                hostname = container.get().getHostname();
            }
            WorkerRollbackRequest upstreamRequest = new WorkerRollbackRequest(upstreamExeVertex, hostname, String.format("Cascading rollback from %s", fromVertex), true);
            upstreamRequest.cascadingGroupId = cascadingGroupId;
            cascadedRollbackRequest.add(upstreamRequest);
        });
        return cascadedRollbackRequest;
    }

    private ExecutionVertex getExeVertexFromRequest(WorkerRollbackRequest rollbackRequest) {
        ActorId actorId = rollbackRequest.fromActorId;
        Optional<BaseActorHandle> rayActor = this.graphManager.getExecutionGraph().getActorById(actorId);
        if (!rayActor.isPresent()) {
            throw new RuntimeException("Can not find ray actor of ID " + actorId);
        }
        return this.getExecutionVertex(rollbackRequest.fromActorId);
    }

    private ExecutionVertex getExecutionVertex(BaseActorHandle actor) {
        return this.graphManager.getExecutionGraph().getExecutionVertexByActorId(actor.getId());
    }

    private ExecutionVertex getExecutionVertex(ActorId actorId) {
        return this.graphManager.getExecutionGraph().getExecutionVertexByActorId(actorId);
    }
}

