/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker;

import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.OneInputProcessor;
import io.ray.streaming.runtime.core.processor.ProcessBuilder;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.core.processor.StreamProcessor;
import io.ray.streaming.runtime.transfer.TransferHandler;
import io.ray.streaming.runtime.util.EnvUtil;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import io.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import io.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobWorker
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
    private static final byte[] NOT_READY_FLAG = new byte[4];
    private JobWorkerContext workerContext;
    private ExecutionVertex executionVertex;
    private StreamingWorkerConfig workerConfig;
    private StreamTask task;
    private TransferHandler transferHandler;

    public JobWorker() {
        LOG.info("Creating job worker succeeded.");
    }

    public Boolean init(JobWorkerContext workerContext) {
        LOG.info("Initiating job worker: {}. Worker context is: {}.", (Object)workerContext.getWorkerName(), (Object)workerContext);
        try {
            this.workerContext = workerContext;
            this.executionVertex = workerContext.getExecutionVertex();
            this.workerConfig = new StreamingWorkerConfig(this.executionVertex.getWorkerConfig());
            TransferChannelType channelType = this.workerConfig.transferConfig.channelType();
            if (TransferChannelType.NATIVE_CHANNEL == channelType) {
                this.transferHandler = new TransferHandler();
            }
            this.task = this.createStreamTask();
            if (this.task == null) {
                return false;
            }
        }
        catch (Exception e) {
            LOG.error("Failed to initiate job worker.", (Throwable)e);
            return false;
        }
        LOG.info("Initiating job worker succeeded: {}.", (Object)workerContext.getWorkerName());
        return true;
    }

    public Boolean start() {
        try {
            this.task.start();
        }
        catch (Exception e) {
            LOG.error("Start worker [{}] occur error.", (Object)this.executionVertex.getExecutionVertexName(), (Object)e);
            return false;
        }
        return true;
    }

    private StreamTask createStreamTask() {
        StreamTask task;
        block4: {
            task = null;
            StreamProcessor streamProcessor = ProcessBuilder.buildProcessor(this.executionVertex.getStreamOperator());
            LOG.debug("Stream processor created: {}.", (Object)streamProcessor);
            try {
                if (streamProcessor instanceof SourceProcessor) {
                    task = new SourceStreamTask(this.getTaskId(), streamProcessor, this);
                    break block4;
                }
                if (streamProcessor instanceof OneInputProcessor) {
                    task = new OneInputStreamTask(this.getTaskId(), streamProcessor, this);
                    break block4;
                }
                throw new RuntimeException("Unsupported processor type:" + streamProcessor);
            }
            catch (Exception e) {
                LOG.info("Failed to create stream task.", (Throwable)e);
                return task;
            }
        }
        LOG.info("Stream task created: {}.", (Object)task);
        return task;
    }

    public int getTaskId() {
        return this.executionVertex.getExecutionVertexId();
    }

    public StreamingWorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public JobWorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ExecutionVertex getExecutionVertex() {
        return this.executionVertex;
    }

    public StreamTask getTask() {
        return this.task;
    }

    public void onReaderMessage(byte[] buffer) {
        this.transferHandler.onReaderMessage(buffer);
    }

    public byte[] onReaderMessageSync(byte[] buffer) {
        if (this.transferHandler == null) {
            return NOT_READY_FLAG;
        }
        return this.transferHandler.onReaderMessageSync(buffer);
    }

    public void onWriterMessage(byte[] buffer) {
        this.transferHandler.onWriterMessage(buffer);
    }

    public byte[] onWriterMessageSync(byte[] buffer) {
        if (this.transferHandler == null) {
            return NOT_READY_FLAG;
        }
        return this.transferHandler.onWriterMessageSync(buffer);
    }

    static {
        EnvUtil.loadNativeLibraries();
    }
}

