/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import io.ray.api.Ray;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.context.OperatorCheckpointInfo;
import io.ray.streaming.runtime.core.collector.OutputCollector;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.master.coordinator.command.WorkerCommitReport;
import io.ray.streaming.runtime.rpc.RemoteCallMaster;
import io.ray.streaming.runtime.transfer.DataReader;
import io.ray.streaming.runtime.transfer.DataWriter;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.transfer.channel.OffsetInfo;
import io.ray.streaming.runtime.util.CheckpointStateUtil;
import io.ray.streaming.runtime.util.Serializer;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import io.ray.streaming.runtime.worker.context.StreamingRuntimeContext;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamTask
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final ContextBackend checkpointState;
    public volatile boolean isInitialState = true;
    public long lastCheckpointId;
    protected Processor processor;
    protected JobWorker jobWorker;
    protected DataReader reader;
    protected DataWriter writer;
    protected volatile boolean running = true;
    protected volatile boolean stopped = false;
    List<Collector> collectors = new ArrayList<Collector>();
    private Set<Long> outdatedCheckpoints = new HashSet<Long>();
    private Thread thread;

    protected StreamTask(Processor processor, JobWorker jobWorker, long lastCheckpointId) {
        this.processor = processor;
        this.jobWorker = jobWorker;
        this.checkpointState = jobWorker.contextBackend;
        this.lastCheckpointId = lastCheckpointId;
        this.thread = new Thread(Ray.wrapRunnable((Runnable)this), this.getClass().getName() + "-" + System.currentTimeMillis());
        this.thread.setDaemon(true);
    }

    public ChannelRecoverInfo recover(boolean isRecover) {
        if (isRecover) {
            LOG.info("Stream task begin recover.");
        } else {
            LOG.info("Stream task first start begin.");
        }
        this.prepareTask(isRecover);
        ChannelRecoverInfo recoverInfo = new ChannelRecoverInfo(new HashMap<String, ChannelRecoverInfo.ChannelCreationStatus>());
        if (this.reader != null) {
            recoverInfo = this.reader.getQueueRecoverInfo();
        }
        this.thread.setUncaughtExceptionHandler((t, e) -> LOG.error("Uncaught exception in runner thread.", e));
        LOG.info("Start stream task: {}.", (Object)this.getClass().getSimpleName());
        this.thread.start();
        if (isRecover) {
            LOG.info("Stream task recover end.");
        } else {
            LOG.info("Stream task first start finished.");
        }
        return recoverInfo;
    }

    private void prepareTask(boolean isRecreate) {
        LOG.info("Preparing stream task, isRecreate={}.", (Object)isRecreate);
        ExecutionVertex executionVertex = this.jobWorker.getExecutionVertex();
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty("streaming.worker_name", executionVertex.getExecutionVertexName());
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty("streaming.op_name", executionVertex.getExecutionJobVertexName());
        OperatorCheckpointInfo operatorCheckpointInfo = new OperatorCheckpointInfo();
        byte[] bytes = null;
        if (isRecreate) {
            String cpKey = this.genOpCheckpointKey(this.lastCheckpointId);
            LOG.info("Getting task checkpoints from state, cpKey={}, checkpointId={}.", (Object)cpKey, (Object)this.lastCheckpointId);
            bytes = CheckpointStateUtil.get(this.checkpointState, cpKey);
            if (bytes == null) {
                String msg = String.format("Task recover failed, checkpoint is null! cpKey=%s", cpKey);
                throw new RuntimeException(msg);
            }
        }
        if (bytes != null) {
            operatorCheckpointInfo = (OperatorCheckpointInfo)Serializer.decode(bytes);
            this.processor.loadCheckpoint(operatorCheckpointInfo.processorCheckpoint);
            LOG.info("Stream task recover from checkpoint state, checkpoint bytes len={}, checkpointInfo={}.", (Object)bytes.length, (Object)operatorCheckpointInfo);
        }
        if (!executionVertex.getOutputEdges().isEmpty()) {
            LOG.info("Register queue writer, channels={}, outputCheckpoints={}.", executionVertex.getOutputChannelIdList(), operatorCheckpointInfo.outputPoints);
            this.writer = new DataWriter(executionVertex.getOutputChannelIdList(), executionVertex.getOutputActorList(), operatorCheckpointInfo.outputPoints, this.jobWorker.getWorkerConfig());
        }
        if (!executionVertex.getInputEdges().isEmpty()) {
            LOG.info("Register queue reader, channels={}, inputCheckpoints={}.", executionVertex.getInputChannelIdList(), operatorCheckpointInfo.inputPoints);
            this.reader = new DataReader(executionVertex.getInputChannelIdList(), executionVertex.getInputActorList(), operatorCheckpointInfo.inputPoints, this.jobWorker.getWorkerConfig());
        }
        this.openProcessor();
        LOG.debug("Finished preparing stream task.");
    }

    private void openProcessor() {
        ExecutionVertex executionVertex = this.jobWorker.getExecutionVertex();
        List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges();
        HashMap opGroupedChannelId = new HashMap();
        HashMap opGroupedActor = new HashMap();
        HashMap<String, Partition> opPartitionMap = new HashMap<String, Partition>();
        for (int i = 0; i < outputEdges.size(); ++i) {
            ExecutionEdge edge = outputEdges.get(i);
            String opName2 = edge.getTargetExecutionJobVertexName();
            if (!opPartitionMap.containsKey(opName2)) {
                opGroupedChannelId.put(opName2, new ArrayList());
                opGroupedActor.put(opName2, new ArrayList());
            }
            ((List)opGroupedChannelId.get(opName2)).add(executionVertex.getOutputChannelIdList().get(i));
            ((List)opGroupedActor.get(opName2)).add(executionVertex.getOutputActorList().get(i));
            opPartitionMap.put(opName2, edge.getPartition());
        }
        opPartitionMap.keySet().forEach(opName -> this.collectors.add(new OutputCollector(this.writer, (Collection)opGroupedChannelId.get(opName), (Collection)opGroupedActor.get(opName), (Partition)opPartitionMap.get(opName))));
        StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(executionVertex, this.jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism());
        this.processor.open(this.collectors, runtimeContext);
    }

    protected abstract void init() throws Exception;

    public void close() {
        this.running = false;
        if (this.thread.isAlive() && !Ray.getRuntimeContext().isSingleProcess()) {
            Runtime.getRuntime().halt(0);
            LOG.warn("runtime halt 0");
            System.exit(0);
        }
        LOG.info("Stream task close success.");
    }

    public boolean triggerCheckpoint(Long barrierId) {
        throw new UnsupportedOperationException("Only source operator supports trigger checkpoints.");
    }

    public void doCheckpoint(long checkpointId, Map<String, OffsetInfo> inputPoints) {
        Map<String, OffsetInfo> outputPoints = null;
        if (this.writer != null) {
            outputPoints = this.writer.getOutputCheckpoints();
            RemoteCall.Barrier barrierPb = RemoteCall.Barrier.newBuilder().setId(checkpointId).build();
            ByteBuffer byteBuffer = ByteBuffer.wrap(barrierPb.toByteArray());
            byteBuffer.order(ByteOrder.nativeOrder());
            this.writer.broadcastBarrier(checkpointId, byteBuffer);
        }
        LOG.info("Start do checkpoint, cp id={}, inputPoints={}, outputPoints={}.", new Object[]{checkpointId, inputPoints, outputPoints});
        this.lastCheckpointId = checkpointId;
        Serializable processorCheckpoint = this.processor.saveCheckpoint();
        try {
            OperatorCheckpointInfo opCpInfo = new OperatorCheckpointInfo(inputPoints, outputPoints, processorCheckpoint, checkpointId);
            this.saveCpStateAndReport(opCpInfo, checkpointId);
        }
        catch (Exception e) {
            LOG.error("Processor or op checkpoint exception.", (Throwable)e);
        }
        LOG.info("Operator do checkpoint {} finish.", (Object)checkpointId);
    }

    private void saveCpStateAndReport(OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) {
        this.saveCp(operatorCheckpointInfo, checkpointId);
        this.reportCommit(checkpointId);
        LOG.info("Finish save cp state and report, checkpoint id is {}.", (Object)checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void saveCp(OperatorCheckpointInfo operatorCheckpointInfo, long checkpointId) {
        byte[] bytes = Serializer.encode(operatorCheckpointInfo);
        String cpKey = this.genOpCheckpointKey(checkpointId);
        LOG.info("Saving task checkpoint, cpKey={}, byte len={}, checkpointInfo={}.", new Object[]{cpKey, bytes.length, operatorCheckpointInfo});
        ContextBackend contextBackend = this.checkpointState;
        synchronized (contextBackend) {
            if (this.outdatedCheckpoints.contains(checkpointId)) {
                LOG.info("Outdated checkpoint, skip save checkpoint.");
                this.outdatedCheckpoints.remove(checkpointId);
            } else {
                CheckpointStateUtil.put(this.checkpointState, cpKey, bytes);
            }
        }
    }

    private void reportCommit(long checkpointId) {
        JobWorkerContext context = this.jobWorker.getWorkerContext();
        LOG.info("Report commit async, checkpoint id {}.", (Object)checkpointId);
        RemoteCallMaster.reportJobWorkerCommitAsync(context.getMaster(), new WorkerCommitReport(context.getWorkerActorId(), checkpointId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointTimeout(long checkpointId) {
        String cpKey = this.genOpCheckpointKey(checkpointId);
        try {
            ContextBackend contextBackend = this.checkpointState;
            synchronized (contextBackend) {
                if (this.checkpointState.exists(cpKey)) {
                    this.checkpointState.remove(cpKey);
                } else {
                    this.outdatedCheckpoints.add(checkpointId);
                }
            }
        }
        catch (Exception e) {
            LOG.error("Notify checkpoint timeout failed, checkpointId is {}.", (Object)checkpointId, (Object)e);
        }
    }

    public void clearExpiredCpState(long checkpointId) {
        String cpKey = this.genOpCheckpointKey(checkpointId);
        try {
            this.checkpointState.remove(cpKey);
        }
        catch (Exception e) {
            LOG.error("Failed to remove key {} from state backend.", (Object)cpKey, (Object)e);
        }
    }

    public void clearExpiredQueueMsg(long checkpointId) {
        byte[] bytes;
        String cpKey = this.genOpCheckpointKey(checkpointId);
        try {
            bytes = this.checkpointState.get(cpKey);
        }
        catch (Exception e) {
            LOG.error("Failed to get key {} from state backend.", (Object)cpKey, (Object)e);
            return;
        }
        if (bytes != null) {
            OperatorCheckpointInfo operatorCheckpointInfo = (OperatorCheckpointInfo)Serializer.decode(bytes);
            long cpId = operatorCheckpointInfo.checkpointId;
            if (this.writer != null) {
                this.writer.clearCheckpoint(cpId);
            }
        }
    }

    public String genOpCheckpointKey(long checkpointId) {
        JobWorkerContext context = this.jobWorker.getWorkerContext();
        return this.jobWorker.getWorkerConfig().checkpointConfig.jobWorkerOpCpPrefixKey() + context.getJobName() + "_" + context.getWorkerName() + "_" + checkpointId;
    }

    protected void requestRollback(String exceptionMsg) {
        this.jobWorker.requestRollback(exceptionMsg);
    }

    public boolean isAlive() {
        return this.thread.isAlive();
    }
}

