package com.concurrentli;

import com.linkedin.batchable.BatchIterator;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;

/* loaded from: input_file:com/concurrentli/ParallelProcessor.class */
public abstract class ParallelProcessor<T, R> implements AutoCloseable, BatchIterator<R> {
    private final ArrayBlockingQueue<InputResultBatch<T, R>> _inputQueue;
    private final SequentialQueue<InputResultBatch<T, R>> _resultQueue;
    private InputResultBatch<T, R> _currentResult;
    private int _currentResultIndex;
    private final Thread _readerThread;
    private final Thread[] _processorThreads;
    private NextInputException _nextInputException;
    private boolean _started;
    private boolean _endOfResults;
    private final InputResultBatch<T, R>[] _batchPool;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/concurrentli/ParallelProcessor$Config.class */
    protected static class Config {
        public static final int DEFAUT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
        public static final int DEFAULT_BATCH_SIZE = 1000;
        private int _readerThreadPriority = 5;
        private int _processorThreads = DEFAUT_THREAD_COUNT;
        private int _processorThreadPriority = 5;
        private int _batchSize = 1000;
        private int _inputBufferBatchCount = DEFAUT_THREAD_COUNT * 2;
        private int _outputBufferBatchCount = DEFAUT_THREAD_COUNT * 2;

        public Config setReaderThreadPriority(int i) {
            this._readerThreadPriority = i;
            return this;
        }

        public Config setProcessorThreads(int i) {
            this._processorThreads = i;
            return this;
        }

        public Config setProcessorThreadPriority(int i) {
            this._processorThreadPriority = i;
            return this;
        }

        public Config setBatchSize(int i) {
            this._batchSize = i;
            return this;
        }

        public Config setInputBufferBatchCount(int i) {
            this._inputBufferBatchCount = i;
            return this;
        }

        public Config setOutputBufferBatchCount(int i) {
            this._outputBufferBatchCount = i;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/concurrentli/ParallelProcessor$InputResultBatch.class */
    public static class InputResultBatch<T, R> {
        long _batchIndex = -1;
        int _length = 0;
        private final Object[] _data;

        InputResultBatch(int i) {
            this._data = new Object[i];
        }
    }

    /* loaded from: input_file:com/concurrentli/ParallelProcessor$NextInputException.class */
    public static class NextInputException extends RuntimeException {
        public NextInputException(String str, RuntimeException runtimeException) {
            super(str, runtimeException);
        }
    }

    /* loaded from: input_file:com/concurrentli/ParallelProcessor$ProcessInputException.class */
    public static class ProcessInputException extends RuntimeException {
        public ProcessInputException(String str, RuntimeException runtimeException) {
            super(str, runtimeException);
        }
    }

    protected abstract T nextInput();

    protected int readInputs(T[] tArr) {
        for (int i = 0; i < tArr.length; i++) {
            T nextInput = nextInput();
            if (nextInput == null) {
                return i;
            }
            tArr[i] = nextInput;
        }
        return tArr.length;
    }

    protected abstract R processInput(T t);

    /* JADX WARN: Multi-variable type inference failed */
    protected void processInputsUnsafe(int i, Object[] objArr) {
        for (int i2 = 0; i2 < i; i2++) {
            objArr[i2] = processInput(objArr[i2]);
        }
    }

    public ParallelProcessor(Config config) {
        this(config._readerThreadPriority, config._processorThreads, config._processorThreadPriority, config._batchSize, config._inputBufferBatchCount, config._outputBufferBatchCount);
    }

    private ParallelProcessor(int i, int i2, int i3, int i4, int i5, int i6) {
        this._currentResult = null;
        this._currentResultIndex = 0;
        this._nextInputException = null;
        this._started = false;
        this._endOfResults = false;
        this._inputQueue = new ArrayBlockingQueue<>(i5);
        this._resultQueue = new SequentialQueue<>(i6);
        this._readerThread = new Thread(Interrupted.ignored(this::readerThread));
        this._readerThread.setDaemon(true);
        this._readerThread.setPriority(i);
        this._batchPool = new InputResultBatch[i5 + i6 + i2 + 2];
        for (int i7 = 0; i7 < this._batchPool.length; i7++) {
            this._batchPool[i7] = new InputResultBatch<>(i4);
        }
        this._processorThreads = new Thread[i2];
        for (int i8 = 0; i8 < i2; i8++) {
            this._processorThreads[i8] = new Thread(Interrupted.ignored(this::processorThread));
            this._processorThreads[i8].setDaemon(true);
            this._processorThreads[i8].setPriority(i3);
        }
    }

    private boolean ensureBuffer() throws InterruptedException {
        start();
        if (this._nextInputException != null) {
            throw this._nextInputException;
        }
        if (this._endOfResults) {
            return false;
        }
        if (this._currentResult == null || this._currentResult._length == this._currentResultIndex) {
            try {
                this._currentResult = this._resultQueue.dequeue();
                this._currentResultIndex = 0;
            } catch (NextInputException e) {
                this._nextInputException = e;
                throw e;
            }
        }
        if (this._currentResult != null) {
            return true;
        }
        this._endOfResults = true;
        return false;
    }

    public R nextInterruptable() throws InterruptedException {
        if (!ensureBuffer()) {
            return null;
        }
        R r = (R) ((InputResultBatch) this._currentResult)._data[this._currentResultIndex];
        ((InputResultBatch) this._currentResult)._data[this._currentResultIndex] = null;
        this._currentResultIndex++;
        return r;
    }

    @Override // com.linkedin.batchable.BatchIterator, java.util.Iterator
    public boolean hasNext() {
        try {
            return ensureBuffer();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new UncheckedInterruptedException(e);
        }
    }

    @Override // java.util.Iterator
    public R next() {
        try {
            return nextInterruptable();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new UncheckedInterruptedException(e);
        }
    }

    @Override // com.linkedin.batchable.BatchIterator
    public int next(R[] rArr, int i, int i2) {
        try {
            if (!ensureBuffer()) {
                return 0;
            }
            int min = Math.min(i2, this._currentResult._length - this._currentResultIndex);
            System.arraycopy(((InputResultBatch) this._currentResult)._data, this._currentResultIndex, rArr, i, min);
            Arrays.fill(((InputResultBatch) this._currentResult)._data, this._currentResultIndex, this._currentResultIndex + min, (Object) null);
            this._currentResultIndex += min;
            return min;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new UncheckedInterruptedException(e);
        }
    }

    @Override // com.linkedin.batchable.BatchIterator
    public long skip(long j) {
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                if ($assertionsDisabled || j3 == j) {
                    return j3;
                }
                throw new AssertionError();
            }
            try {
                if (!ensureBuffer()) {
                    return j3;
                }
                int min = (int) Math.min(j - j3, this._currentResult._length - this._currentResultIndex);
                Arrays.fill(((InputResultBatch) this._currentResult)._data, this._currentResultIndex, this._currentResultIndex + min, (Object) null);
                this._currentResultIndex += min;
                j2 = j3 + min;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedInterruptedException(e);
            }
        }
    }

    public BlockingStatus getBlockingStatus() {
        if (this._endOfResults) {
            return BlockingStatus.NOT_BLOCKING;
        }
        if ((this._currentResult == null || this._currentResult._length == this._currentResultIndex) && !this._resultQueue.isNextAvailable()) {
            return BlockingStatus.TEMPORARILY_BLOCKING;
        }
        return BlockingStatus.NOT_BLOCKING;
    }

    @Override // com.linkedin.batchable.BatchIterator
    public long available() {
        if (this._endOfResults) {
            return 0L;
        }
        try {
            if (this._currentResult != null && this._currentResult._length != this._currentResultIndex) {
                return this._currentResult._length - this._currentResultIndex;
            }
            if (this._resultQueue.isNextAvailable()) {
                return this._resultQueue.peek()._length;
            }
            return 0L;
        } catch (Exception e) {
            return 0L;
        }
    }

    @Override // java.lang.AutoCloseable, com.linkedin.batchable.BatchIterator
    public void close() {
        this._endOfResults = true;
        stopNow();
    }

    public void start() {
        if (this._started) {
            return;
        }
        this._started = true;
        this._readerThread.start();
        for (Thread thread : this._processorThreads) {
            thread.start();
        }
    }

    private void stopNow() {
        for (Thread thread : this._processorThreads) {
            thread.interrupt();
        }
        this._readerThread.interrupt();
    }

    private void enqueueException(long j, RuntimeException runtimeException) throws InterruptedException {
        this._resultQueue.enqueueException(j, runtimeException);
    }

    private void enqueueInputTerminators(long j) throws InterruptedException {
        for (int i = 0; i < this._processorThreads.length; i++) {
            InputResultBatch<T, R> inputResultBatch = this._batchPool[(int) (j % this._batchPool.length)];
            inputResultBatch._batchIndex = -1L;
            this._inputQueue.put(inputResultBatch);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readerThread() throws InterruptedException {
        long j = 0;
        while (true) {
            try {
                InputResultBatch<T, R> inputResultBatch = this._batchPool[(int) (j % this._batchPool.length)];
                inputResultBatch._batchIndex = j;
                inputResultBatch._length = readInputs(((InputResultBatch) inputResultBatch)._data);
                if (inputResultBatch._length == 0) {
                    this._resultQueue.enqueue(j, null);
                    enqueueInputTerminators(j);
                    return;
                } else {
                    this._inputQueue.put(inputResultBatch);
                    j++;
                }
            } catch (RuntimeException e) {
                enqueueException(j, new NextInputException("An exception occurred while getting input batch " + j, e));
                enqueueInputTerminators(j);
                return;
            }
        }
    }

    private void processorThread() throws InterruptedException {
        while (true) {
            InputResultBatch<T, R> take = this._inputQueue.take();
            try {
            } catch (RuntimeException e) {
                enqueueException(take._batchIndex, new ProcessInputException("An exception occurred while processing input batch " + take._batchIndex, e));
            }
            if (take._batchIndex < 0) {
                return;
            }
            processInputsUnsafe(take._length, ((InputResultBatch) take)._data);
            this._resultQueue.enqueue(take._batchIndex, take);
        }
    }

    static {
        $assertionsDisabled = !ParallelProcessor.class.desiredAssertionStatus();
    }
}
