/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import com.google.common.base.MoreObjects;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.channel.OffsetInfo;
import io.ray.streaming.runtime.transfer.exception.ChannelInterruptException;
import io.ray.streaming.runtime.transfer.message.BarrierMessage;
import io.ray.streaming.runtime.transfer.message.ChannelMessage;
import io.ray.streaming.runtime.transfer.message.DataMessage;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.util.Map;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class InputStreamTask
extends StreamTask {
    private static final Logger LOG = LoggerFactory.getLogger(InputStreamTask.class);
    private final Serializer javaSerializer;
    private final Serializer crossLangSerializer;
    private final long readTimeoutMillis;

    public InputStreamTask(Processor processor, JobWorker jobWorker, long lastCheckpointId) {
        super(processor, jobWorker, lastCheckpointId);
        this.readTimeoutMillis = jobWorker.getWorkerConfig().transferConfig.readerTimerIntervalMs();
        this.javaSerializer = new JavaSerializer();
        this.crossLangSerializer = new CrossLangSerializer();
    }

    @Override
    protected void init() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (this.running) {
                ChannelMessage item;
                Object object = this.jobWorker.initialStateChangeLock;
                synchronized (object) {
                    item = this.reader.read(this.readTimeoutMillis);
                    if (item == null) {
                        continue;
                    }
                    this.isInitialState = false;
                }
                if (item instanceof DataMessage) {
                    DataMessage dataMessage = (DataMessage)item;
                    byte[] bytes = new byte[dataMessage.body().remaining() - 1];
                    byte typeId = dataMessage.body().get();
                    dataMessage.body().get(bytes);
                    Object obj = typeId == 1 ? this.javaSerializer.deserialize(bytes) : this.crossLangSerializer.deserialize(bytes);
                    this.processor.process(obj);
                    continue;
                }
                if (!(item instanceof BarrierMessage)) continue;
                BarrierMessage queueBarrier = (BarrierMessage)item;
                byte[] barrierData = new byte[queueBarrier.getData().remaining()];
                queueBarrier.getData().get(barrierData);
                RemoteCall.Barrier barrierPb = RemoteCall.Barrier.parseFrom(barrierData);
                long checkpointId = barrierPb.getId();
                LOG.info("Start to do checkpoint {}, worker name is {}.", (Object)checkpointId, (Object)this.jobWorker.getWorkerContext().getWorkerName());
                Map<String, OffsetInfo> inputPoints = queueBarrier.getInputOffsets();
                this.doCheckpoint(checkpointId, inputPoints);
                LOG.info("Do checkpoint {} success.", (Object)checkpointId);
            }
        }
        catch (Throwable throwable) {
            if (throwable instanceof ChannelInterruptException || ExceptionUtils.getRootCause((Throwable)throwable) instanceof ChannelInterruptException) {
                LOG.info("queue has stopped.");
            }
            LOG.error("Last success checkpointId={}, now occur error.", (Object)this.lastCheckpointId, (Object)throwable);
            this.requestRollback(ExceptionUtils.getStackTrace((Throwable)throwable));
        }
        LOG.info("Input stream task thread exit.");
        this.stopped = true;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("processor", (Object)this.processor).toString();
    }
}

