/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.worker.tasks;

import io.ray.streaming.runtime.core.processor.Processor;
import io.ray.streaming.runtime.core.processor.SourceProcessor;
import io.ray.streaming.runtime.transfer.exception.ChannelInterruptException;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.tasks.StreamTask;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceStreamTask
extends StreamTask {
    private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTask.class);
    private final SourceProcessor sourceProcessor;
    private final AtomicReference<Long> pendingBarrier = new AtomicReference();
    private long lastCheckpointId = 0L;

    public SourceStreamTask(Processor sourceProcessor, JobWorker jobWorker, long lastCheckpointId) {
        super(sourceProcessor, jobWorker, lastCheckpointId);
        this.sourceProcessor = (SourceProcessor)this.processor;
    }

    @Override
    protected void init() {
    }

    @Override
    public void run() {
        LOG.info("Source stream task thread start.");
        try {
            while (this.running) {
                this.isInitialState = false;
                Long barrierId = this.pendingBarrier.get();
                if (barrierId != null) {
                    if (this.pendingBarrier.compareAndSet(barrierId, null)) {
                        LOG.info("Start to do checkpoint {}, worker name is {}.", (Object)barrierId, (Object)this.jobWorker.getWorkerContext().getWorkerName());
                        this.doCheckpoint(barrierId, null);
                        LOG.info("Finish to do checkpoint {}.", (Object)barrierId);
                    } else {
                        LOG.warn("Pending checkpointId modify unexpected, expect={}, now={}.", (Object)barrierId, (Object)this.pendingBarrier.get());
                    }
                }
                this.sourceProcessor.fetch();
            }
        }
        catch (Throwable e) {
            if (e instanceof ChannelInterruptException || ExceptionUtils.getRootCause((Throwable)e) instanceof ChannelInterruptException) {
                LOG.info("queue has stopped.");
            }
            LOG.error("Last success checkpointId={}, now occur error.", (Object)this.lastCheckpointId, (Object)e);
            this.requestRollback(ExceptionUtils.getStackTrace((Throwable)e));
        }
        LOG.info("Source stream task thread exit.");
    }

    @Override
    public boolean triggerCheckpoint(Long barrierId) {
        return this.pendingBarrier.compareAndSet(null, barrierId);
    }
}

