package org.apache.streams.local.tasks;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.core.util.DatumUtils;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/local/tasks/StreamsProcessorTask.class */
public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatusCountable {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorTask.class);
    private StreamsProcessor processor;
    private AtomicBoolean keepRunning;
    private StreamsConfiguration streamConfig;
    private BlockingQueue<StreamsDatum> inQueue;
    private AtomicBoolean isRunning;
    private AtomicBoolean blocked;
    private StreamsTaskCounter counter;
    private DatumStatusCounter statusCounter;

    public DatumStatusCounter getDatumStatusCounter() {
        return this.statusCounter;
    }

    public StreamsProcessorTask(StreamsProcessor streamsProcessor) {
        this(streamsProcessor, StreamsConfigurator.detectConfiguration());
    }

    public StreamsProcessorTask(StreamsProcessor streamsProcessor, StreamsConfiguration streamsConfiguration) {
        super(streamsConfiguration);
        this.statusCounter = new DatumStatusCounter();
        this.streamConfig = super.streamConfig;
        this.processor = streamsProcessor;
        this.keepRunning = new AtomicBoolean(true);
        this.isRunning = new AtomicBoolean(true);
        this.blocked = new AtomicBoolean(true);
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public boolean isWaiting() {
        return this.inQueue.isEmpty() && this.blocked.get();
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void stopTask() {
        this.keepRunning.set(false);
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void setStreamConfig(StreamsConfiguration streamsConfiguration) {
        this.streamConfig = streamsConfiguration;
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public void addInputQueue(BlockingQueue<StreamsDatum> blockingQueue) {
        this.inQueue = blockingQueue;
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public boolean isRunning() {
        return this.isRunning.get();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                this.processor.prepare(this.streamConfig);
                if (this.counter == null) {
                    this.counter = new StreamsTaskCounter(this.processor.getClass().getName() + UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt());
                }
                while (this.keepRunning.get()) {
                    StreamsDatum streamsDatum = null;
                    try {
                        try {
                            this.blocked.set(true);
                            streamsDatum = this.inQueue.poll(this.streamConfig.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
                            this.blocked.set(false);
                        } catch (Throwable th) {
                            this.blocked.set(false);
                            throw th;
                        }
                    } catch (InterruptedException e) {
                        LOGGER.debug("Received InteruptedException, shutting down and re-applying interrupt status.");
                        this.keepRunning.set(false);
                        if (!this.inQueue.isEmpty()) {
                            LOGGER.error("Received InteruptedException and input queue still has data, count={}, processor={}", Integer.valueOf(this.inQueue.size()), this.processor.getClass().getName());
                        }
                        Thread.currentThread().interrupt();
                        this.blocked.set(false);
                    }
                    if (streamsDatum != null) {
                        this.counter.incrementReceivedCount();
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            List process = this.processor.process(streamsDatum);
                            this.counter.addTime(System.currentTimeMillis() - currentTimeMillis);
                            if (process != null) {
                                Iterator it = process.iterator();
                                while (it.hasNext()) {
                                    super.addToOutgoingQueue((StreamsDatum) it.next());
                                    this.counter.incrementEmittedCount();
                                    this.statusCounter.incrementStatus(DatumStatus.SUCCESS);
                                }
                            }
                        } catch (InterruptedException e2) {
                            LOGGER.warn("Received InterruptedException, shutting down and re-applying interrupt status.");
                            this.keepRunning.set(false);
                            Thread.currentThread().interrupt();
                        } catch (Throwable th2) {
                            this.counter.incrementErrorCount();
                            LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), th2);
                            this.statusCounter.incrementStatus(DatumStatus.FAIL);
                            DatumUtils.addErrorToMetadata(streamsDatum, th2, this.processor.getClass());
                        }
                    } else {
                        LOGGER.trace("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName());
                    }
                }
            } catch (Throwable th3) {
                LOGGER.error("Caught Throwable in Processor {}", this.processor.getClass().getSimpleName(), th3);
                this.isRunning.set(false);
                this.processor.cleanUp();
            }
        } finally {
            this.isRunning.set(false);
            this.processor.cleanUp();
        }
    }

    @Override // org.apache.streams.local.tasks.BaseStreamsTask, org.apache.streams.local.tasks.StreamsTask
    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
        LinkedList linkedList = new LinkedList();
        linkedList.add(this.inQueue);
        return linkedList;
    }

    @Override // org.apache.streams.local.tasks.StreamsTask
    public void setStreamsTaskCounter(StreamsTaskCounter streamsTaskCounter) {
        this.counter = streamsTaskCounter;
    }
}
