/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import com.google.common.base.MoreObjects;
import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.DataMessage;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.tasks.StreamTask;

public abstract class InputStreamTask
extends StreamTask {
    private volatile boolean running = true;
    private volatile boolean stopped = false;
    private long readTimeoutMillis;
    private final Serializer javaSerializer;
    private final Serializer crossLangSerializer;

    public InputStreamTask(int taskId, Processor processor, JobWorker jobWorker) {
        super(taskId, processor, jobWorker);
        this.readTimeoutMillis = jobWorker.getWorkerConfig().transferConfig.readerTimerIntervalMs();
        this.javaSerializer = new JavaSerializer();
        this.crossLangSerializer = new CrossLangSerializer();
    }

    @Override
    protected void init() {
    }

    @Override
    public void run() {
        while (this.running) {
            DataMessage item = this.reader.read(this.readTimeoutMillis);
            if (item == null) continue;
            byte[] bytes = new byte[item.body().remaining() - 1];
            byte typeId = item.body().get();
            item.body().get(bytes);
            Object obj = typeId == 1 ? this.javaSerializer.deserialize(bytes) : this.crossLangSerializer.deserialize(bytes);
            this.processor.process(obj);
        }
        this.stopped = true;
    }

    @Override
    protected void cancelTask() throws Exception {
        this.running = false;
        while (!this.stopped) {
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("taskId", this.taskId).add("processor", (Object)this.processor).toString();
    }
}

