/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.runtime.core.collector.OutputCollector;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionEdge;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.transfer.ChannelId;
import io.ray.streaming.runtime.transfer.DataReader;
import io.ray.streaming.runtime.transfer.DataWriter;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.StreamingRuntimeContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamTask
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected int taskId;
    protected Processor processor;
    protected JobWorker jobWorker;
    protected DataReader reader;
    List<Collector> collectors = new ArrayList<Collector>();
    protected volatile boolean running = true;
    protected volatile boolean stopped = false;
    private Thread thread;

    protected StreamTask(int taskId, Processor processor, JobWorker jobWorker) {
        this.taskId = taskId;
        this.processor = processor;
        this.jobWorker = jobWorker;
        this.prepareTask();
        this.thread = new Thread(Ray.wrapRunnable((Runnable)this), this.getClass().getName() + "-" + System.currentTimeMillis());
        this.thread.setDaemon(true);
    }

    private void prepareTask() {
        LOG.debug("Preparing stream task.");
        ExecutionVertex executionVertex = this.jobWorker.getExecutionVertex();
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty("streaming.worker_name", executionVertex.getExecutionVertexName());
        this.jobWorker.getWorkerConfig().workerInternalConfig.setProperty("streaming.op_name", executionVertex.getExecutionJobVertexName());
        List<ExecutionEdge> outputEdges = executionVertex.getOutputEdges();
        ArrayList<String> outputChannelIds = new ArrayList<String>();
        ArrayList<BaseActorHandle> targetActors = new ArrayList<BaseActorHandle>();
        for (ExecutionEdge edge : outputEdges) {
            String channelId = ChannelId.genIdStr(this.taskId, edge.getTargetExecutionVertex().getExecutionVertexId(), executionVertex.getBuildTime());
            outputChannelIds.add(channelId);
            targetActors.add(edge.getTargetExecutionVertex().getWorkerActor());
        }
        if (!targetActors.isEmpty()) {
            DataWriter writer = new DataWriter(outputChannelIds, targetActors, this.jobWorker.getWorkerConfig());
            HashMap opGroupedChannelId = new HashMap();
            HashMap opGroupedActor = new HashMap();
            HashMap opPartitionMap = new HashMap();
            for (int i = 0; i < outputEdges.size(); ++i) {
                ExecutionEdge edge = outputEdges.get(i);
                String opName2 = edge.getTargetExecutionJobVertexName();
                if (!opPartitionMap.containsKey(opName2)) {
                    opGroupedChannelId.put(opName2, new ArrayList());
                    opGroupedActor.put(opName2, new ArrayList());
                }
                ((List)opGroupedChannelId.get(opName2)).add(outputChannelIds.get(i));
                ((List)opGroupedActor.get(opName2)).add(targetActors.get(i));
                opPartitionMap.put(opName2, edge.getPartition());
            }
            opPartitionMap.keySet().forEach(opName -> this.collectors.add(new OutputCollector(writer, (Collection)opGroupedChannelId.get(opName), (Collection)opGroupedActor.get(opName), (Partition)opPartitionMap.get(opName))));
        }
        List<ExecutionEdge> inputEdges = executionVertex.getInputEdges();
        ArrayList<String> inputChannelIds = new ArrayList<String>();
        ArrayList<BaseActorHandle> inputActors = new ArrayList<BaseActorHandle>();
        for (ExecutionEdge edge : inputEdges) {
            String queueName = ChannelId.genIdStr(edge.getSourceExecutionVertex().getExecutionVertexId(), this.taskId, executionVertex.getBuildTime());
            inputChannelIds.add(queueName);
            inputActors.add(edge.getSourceExecutionVertex().getWorkerActor());
        }
        if (!inputActors.isEmpty()) {
            LOG.info("Register queue consumer, channels {}.", inputChannelIds);
            this.reader = new DataReader(inputChannelIds, inputActors, this.jobWorker.getWorkerConfig());
        }
        StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(executionVertex, this.jobWorker.getWorkerConfig().configMap, executionVertex.getParallelism());
        this.processor.open(this.collectors, runtimeContext);
        LOG.debug("Finished preparing stream task.");
    }

    protected abstract void init() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public void start() {
        LOG.info("Start stream task: {}-{}", (Object)this.getClass().getSimpleName(), (Object)this.taskId);
        this.thread.start();
    }

    public void close() {
        this.running = false;
        if (this.thread.isAlive() && !Ray.getRuntimeContext().isSingleProcess()) {
            Runtime.getRuntime().halt(0);
            LOG.warn("runtime halt 0");
            System.exit(0);
        }
        LOG.info("Stream task close success.");
    }
}

