/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker;

import io.ray.api.Ray;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.context.ContextBackend;
import io.ray.streaming.runtime.context.ContextBackendFactory;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.processor.OneInputProcessor;
import io.ray.streaming.runtime.core.processor.ProcessBuilder;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.core.processor.StreamProcessor;
import io.ray.streaming.runtime.master.coordinator.command.WorkerRollbackRequest;
import io.ray.streaming.runtime.message.CallResult;
import io.ray.streaming.runtime.rpc.RemoteCallMaster;
import io.ray.streaming.runtime.transfer.TransferHandler;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.util.CheckpointStateUtil;
import io.ray.streaming.runtime.util.EnvUtil;
import io.ray.streaming.runtime.util.Serializer;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import io.ray.streaming.runtime.worker.tasks.OneInputStreamTask;
import io.ray.streaming.runtime.worker.tasks.SourceStreamTask;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobWorker
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
    private static final byte[] NOT_READY_FLAG = new byte[4];
    public final Object initialStateChangeLock = new Object();
    public AtomicBoolean isRecreate = new AtomicBoolean(false);
    public ContextBackend contextBackend;
    private JobWorkerContext workerContext;
    private ExecutionVertex executionVertex;
    private StreamingWorkerConfig workerConfig;
    private StreamTask task;
    private TransferHandler transferHandler;
    private boolean isNeedRollback = false;
    private int rollbackCount = 0;

    public JobWorker(ExecutionVertex executionVertex) {
        LOG.info("Creating job worker.");
        this.executionVertex = executionVertex;
        this.workerConfig = new StreamingWorkerConfig(executionVertex.getWorkerConfig());
        this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig);
        LOG.info("Ray.getRuntimeContext().wasCurrentActorRestarted()={}", (Object)Ray.getRuntimeContext().wasCurrentActorRestarted());
        if (!Ray.getRuntimeContext().wasCurrentActorRestarted()) {
            this.saveContext();
            LOG.info("Job worker is fresh started, init success.");
            return;
        }
        LOG.info("Begin load job worker checkpoint state.");
        byte[] bytes = CheckpointStateUtil.get(this.contextBackend, this.getJobWorkerContextKey());
        if (bytes != null) {
            JobWorkerContext context = (JobWorkerContext)Serializer.decode(bytes);
            LOG.info("Worker recover from checkpoint state, byte len={}, context={}.", (Object)bytes.length, (Object)context);
            this.init(context);
            this.requestRollback("LoadCheckpoint request rollback in new actor.");
        } else {
            LOG.error("Worker is reconstructed, but can't load checkpoint. Check whether you checkpoint state is reliable. Current checkpoint state is {}.", (Object)this.contextBackend.getClass().getName());
        }
    }

    public synchronized void saveContext() {
        byte[] contextBytes = Serializer.encode(this.workerContext);
        String key = this.getJobWorkerContextKey();
        LOG.info("Saving context, worker context={}, serialized byte length={}, key={}.", new Object[]{this.workerContext, contextBytes.length, key});
        CheckpointStateUtil.put(this.contextBackend, key, contextBytes);
    }

    public Boolean init(JobWorkerContext workerContext) {
        LOG.info("Initiating job worker: {}. Worker context is: {}, pid={}.", new Object[]{workerContext.getWorkerName(), workerContext, EnvUtil.getJvmPid()});
        this.workerContext = workerContext;
        this.executionVertex = workerContext.getExecutionVertex();
        this.workerConfig = new StreamingWorkerConfig(this.executionVertex.getWorkerConfig());
        this.contextBackend = ContextBackendFactory.getContextBackend(this.workerConfig);
        LOG.info("Initiating job worker succeeded: {}.", (Object)workerContext.getWorkerName());
        this.saveContext();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CallResult<ChannelRecoverInfo> rollback(Long checkpointId, Long startRollbackTs) {
        Object object = this.initialStateChangeLock;
        synchronized (object) {
            if (this.task != null && this.task.isAlive() && checkpointId == this.task.lastCheckpointId && this.task.isInitialState) {
                return CallResult.skipped("Task is already in initial state, skip this rollback.");
            }
        }
        long remoteCallCost = System.currentTimeMillis() - startRollbackTs;
        LOG.info("Start rollback[{}], checkpoint is {}, remote call cost {}ms.", new Object[]{this.executionVertex.getExecutionJobVertexName(), checkpointId, remoteCallCost});
        ++this.rollbackCount;
        if (this.rollbackCount > 1) {
            this.isRecreate.set(true);
        }
        try {
            TransferChannelType channelType = this.workerConfig.transferConfig.channelType();
            if (TransferChannelType.NATIVE_CHANNEL == channelType) {
                this.transferHandler = new TransferHandler();
            }
            if (this.task != null) {
                this.task.close();
                this.task = null;
            }
            this.task = this.createStreamTask(checkpointId);
            ChannelRecoverInfo channelRecoverInfo = this.task.recover(this.isRecreate.get());
            this.isNeedRollback = false;
            LOG.info("Rollback job worker success, checkpoint is {}, channelRecoverInfo is {}.", (Object)checkpointId, (Object)channelRecoverInfo);
            return CallResult.success(channelRecoverInfo);
        }
        catch (Exception e) {
            LOG.error("Rollback job worker has exception.", (Throwable)e);
            return CallResult.fail(ExceptionUtils.getStackTrace((Throwable)e));
        }
    }

    private StreamTask createStreamTask(long checkpointId) {
        StreamTask task;
        StreamProcessor streamProcessor = ProcessBuilder.buildProcessor(this.executionVertex.getStreamOperator());
        LOG.debug("Stream processor created: {}.", (Object)streamProcessor);
        if (streamProcessor instanceof SourceProcessor) {
            task = new SourceStreamTask(streamProcessor, this, checkpointId);
        } else if (streamProcessor instanceof OneInputProcessor) {
            task = new OneInputStreamTask(streamProcessor, this, checkpointId);
        } else {
            throw new RuntimeException("Unsupported processor type:" + streamProcessor);
        }
        LOG.info("Stream task created: {}.", (Object)task);
        return task;
    }

    public Boolean triggerCheckpoint(Long barrierId) {
        LOG.info("Receive trigger, barrierId is {}.", (Object)barrierId);
        if (this.task != null) {
            return this.task.triggerCheckpoint(barrierId);
        }
        return false;
    }

    public Boolean notifyCheckpointTimeout(Long checkpointId) {
        LOG.info("Notify checkpoint timeout, checkpoint id is {}.", (Object)checkpointId);
        if (this.task != null) {
            this.task.notifyCheckpointTimeout(checkpointId);
        }
        return true;
    }

    public Boolean clearExpiredCheckpoint(Long expiredStateCpId, Long expiredQueueCpId) {
        LOG.info("Clear expired checkpoint state, checkpoint id is {}; Clear expired queue msg, checkpoint id is {}", (Object)expiredStateCpId, (Object)expiredQueueCpId);
        if (this.task != null) {
            if (expiredStateCpId > 0L) {
                this.task.clearExpiredCpState(expiredStateCpId);
            }
            this.task.clearExpiredQueueMsg(expiredQueueCpId);
        }
        return true;
    }

    public void requestRollback(String exceptionMsg) {
        LOG.info("Request rollback.");
        this.isNeedRollback = true;
        this.isRecreate.set(true);
        boolean requestRet = RemoteCallMaster.requestJobWorkerRollback(this.workerContext.getMaster(), new WorkerRollbackRequest(this.workerContext.getWorkerActorId(), exceptionMsg, EnvUtil.getHostName(), EnvUtil.getJvmPid()));
        if (!requestRet) {
            LOG.warn("Job worker request rollback failed! exceptionMsg={}.", (Object)exceptionMsg);
        }
    }

    public Boolean checkIfNeedRollback(Long startCallTs) {
        long remoteCallCost = System.currentTimeMillis() - startCallTs;
        LOG.info("Finished checking if need to rollback with result: {}, rpc delay={}ms.", (Object)this.isNeedRollback, (Object)remoteCallCost);
        return this.isNeedRollback;
    }

    public StreamingWorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public JobWorkerContext getWorkerContext() {
        return this.workerContext;
    }

    public ExecutionVertex getExecutionVertex() {
        return this.executionVertex;
    }

    public StreamTask getTask() {
        return this.task;
    }

    private String getJobWorkerContextKey() {
        return this.workerConfig.checkpointConfig.jobWorkerContextCpPrefixKey() + this.workerConfig.commonConfig.jobName() + "_" + this.executionVertex.getExecutionVertexId();
    }

    public void onReaderMessage(byte[] buffer) {
        if (this.transferHandler != null) {
            this.transferHandler.onReaderMessage(buffer);
        }
    }

    public byte[] onReaderMessageSync(byte[] buffer) {
        if (this.transferHandler == null) {
            return NOT_READY_FLAG;
        }
        return this.transferHandler.onReaderMessageSync(buffer);
    }

    public void onWriterMessage(byte[] buffer) {
        if (this.transferHandler != null) {
            this.transferHandler.onWriterMessage(buffer);
        }
    }

    public byte[] onWriterMessageSync(byte[] buffer) {
        if (this.transferHandler == null) {
            return NOT_READY_FLAG;
        }
        return this.transferHandler.onWriterMessageSync(buffer);
    }

    static {
        EnvUtil.loadNativeLibraries();
    }
}

