package com.gs.fw.common.mithra.util;

import com.gs.fw.common.mithra.MithraList;
import com.gs.fw.common.mithra.finder.Operation;
import com.gs.fw.common.mithra.finder.RelatedFinder;
import com.gs.fw.finder.Navigation;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.collections.impl.list.mutable.FastList;
import org.eclipse.collections.impl.map.mutable.UnifiedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/gs/fw/common/mithra/util/MultiThreadedBatchProcessor.class */
public class MultiThreadedBatchProcessor<T, TL extends MithraList<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MultiThreadedBatchProcessor.class);
    private final Set<Object> shards;
    private final RelatedFinder<T> finderInstance;
    private final Operation mainOperation;
    private final Consumer<T, TL> consumer;
    private final List<Navigation<T>> deepFetches;
    private Map<Object, Operation> additionalPerShardRetrievalOperations = UnifiedMap.newMap();
    private ErrorHandler<T, TL> errorHandler = new DefaultErrorHandler();
    private int batchSize = 2000;
    private int retrievalThreads = -1;
    private AtomicLong totalQueued = new AtomicLong();
    private AtomicLong totalDeepFetchedTime = new AtomicLong();
    private AtomicLong totalDeepFetched = new AtomicLong();

    /* loaded from: input_file:com/gs/fw/common/mithra/util/MultiThreadedBatchProcessor$Consumer.class */
    public interface Consumer<T, TL extends MithraList<T>> {
        void startConsumption(MultiThreadedBatchProcessor<T, TL> multiThreadedBatchProcessor);

        void consume(TL tl) throws Exception;

        void endConsumption(MultiThreadedBatchProcessor<T, TL> multiThreadedBatchProcessor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/gs/fw/common/mithra/util/MultiThreadedBatchProcessor$DeepFetchAndBatchProcessorRunnable.class */
    public class DeepFetchAndBatchProcessorRunnable implements Runnable {
        private final CountDownLatch loadLatch;
        private final LinkedBlockingQueue<TL> listBeforeDeepFetchesQueue;
        private final AutoShutdownThreadExecutor loadExecutor;
        private final AutoShutdownThreadExecutor deepFetchExecutor;
        private final CountDownLatch deepFetchLatch;
        private final Consumer<T, TL> consumer;
        private final ErrorHandler<T, TL> errorHandler;

        public DeepFetchAndBatchProcessorRunnable(CountDownLatch countDownLatch, LinkedBlockingQueue<TL> linkedBlockingQueue, AutoShutdownThreadExecutor autoShutdownThreadExecutor, AutoShutdownThreadExecutor autoShutdownThreadExecutor2, CountDownLatch countDownLatch2, Consumer<T, TL> consumer, ErrorHandler<T, TL> errorHandler) {
            this.loadLatch = countDownLatch;
            this.listBeforeDeepFetchesQueue = linkedBlockingQueue;
            this.loadExecutor = autoShutdownThreadExecutor;
            this.deepFetchExecutor = autoShutdownThreadExecutor2;
            this.deepFetchLatch = countDownLatch2;
            this.consumer = consumer;
            this.errorHandler = errorHandler;
        }

        public void processDeepFetchQueue() {
            TL tl = null;
            try {
                tl = this.listBeforeDeepFetchesQueue.poll(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
            if (tl != null) {
                MultiThreadedBatchProcessor.this.totalDeepFetchedTime.addAndGet(MultiThreadedBatchProcessor.this.deepFetchBatch(tl));
                MultiThreadedBatchProcessor.this.totalDeepFetched.addAndGet(tl.size());
                try {
                    this.consumer.consume(tl);
                } catch (Throwable th) {
                    this.errorHandler.handleError(th, MultiThreadedBatchProcessor.this, tl);
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.loadLatch.getCount() != 0) {
                if (this.loadExecutor.isAborted() || this.deepFetchExecutor.isAborted()) {
                    this.deepFetchExecutor.shutdownNow();
                    break;
                }
                processDeepFetchQueue();
            }
            while (!this.listBeforeDeepFetchesQueue.isEmpty()) {
                processDeepFetchQueue();
            }
            this.deepFetchLatch.countDown();
        }
    }

    /* loaded from: input_file:com/gs/fw/common/mithra/util/MultiThreadedBatchProcessor$DefaultErrorHandler.class */
    private static class DefaultErrorHandler<T, TL extends MithraList<T>> implements ErrorHandler<T, TL> {
        private DefaultErrorHandler() {
        }

        @Override // com.gs.fw.common.mithra.util.MultiThreadedBatchProcessor.ErrorHandler
        public void handleError(Throwable th, MultiThreadedBatchProcessor<T, TL> multiThreadedBatchProcessor, TL tl) {
            if (!(th instanceof RuntimeException)) {
                throw new RuntimeException("Unhandled exception", th);
            }
            throw ((RuntimeException) th);
        }
    }

    /* loaded from: input_file:com/gs/fw/common/mithra/util/MultiThreadedBatchProcessor$ErrorHandler.class */
    public interface ErrorHandler<T, TL extends MithraList<T>> {
        void handleError(Throwable th, MultiThreadedBatchProcessor<T, TL> multiThreadedBatchProcessor, TL tl);
    }

    public MultiThreadedBatchProcessor(RelatedFinder<T> relatedFinder, Operation operation, List<Navigation<T>> list, Consumer<T, TL> consumer, Set<Object> set) {
        this.finderInstance = relatedFinder;
        this.mainOperation = operation;
        this.deepFetches = list;
        this.consumer = consumer;
        this.shards = set;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public void setErrorHandler(ErrorHandler<T, TL> errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setRetrievalThreads(int i) {
        this.retrievalThreads = i;
    }

    public void setAdditionalPerShardRetrievalOperations(Map<Object, Operation> map) {
        this.additionalPerShardRetrievalOperations = map;
    }

    public long getTotalDeepFetched() {
        return this.totalDeepFetched.get();
    }

    public long getTotalDeepFetchedTime() {
        return this.totalDeepFetchedTime.get();
    }

    public long getTotalQueued() {
        return this.totalQueued.get();
    }

    public void process() {
        this.consumer.startConsumption(this);
        load(this.consumer);
        this.consumer.endConsumption(this);
    }

    public void load(Consumer<T, TL> consumer) {
        int i = this.retrievalThreads;
        if (i == -1) {
            i = this.shards == null ? 1 : this.shards.size();
        }
        AutoShutdownThreadExecutor autoShutdownThreadExecutor = new AutoShutdownThreadExecutor(i, "MTBP load");
        autoShutdownThreadExecutor.setTimeoutInMilliseconds(10);
        int i2 = i * 3;
        AutoShutdownThreadExecutor autoShutdownThreadExecutor2 = new AutoShutdownThreadExecutor(i2, "MTBP process");
        autoShutdownThreadExecutor2.setTimeoutInMilliseconds(10);
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(i2 + (i2 / 10) + 10);
        final CountDownLatch countDownLatch = new CountDownLatch(this.shards == null ? 1 : this.shards.size());
        CountDownLatch countDownLatch2 = new CountDownLatch(i2);
        for (int i3 = 0; i3 < i2; i3++) {
            autoShutdownThreadExecutor2.submit(new DeepFetchAndBatchProcessorRunnable(countDownLatch, linkedBlockingQueue, autoShutdownThreadExecutor, autoShutdownThreadExecutor2, countDownLatch2, consumer, this.errorHandler));
        }
        autoShutdownThreadExecutor2.shutdown();
        if (this.shards != null) {
            for (final Object obj : this.shards) {
                autoShutdownThreadExecutor.submit(new Runnable() { // from class: com.gs.fw.common.mithra.util.MultiThreadedBatchProcessor.1
                    @Override // java.lang.Runnable
                    public void run() {
                        MultiThreadedBatchProcessor.this.queueForDeepFetchAndProcessing(obj, linkedBlockingQueue);
                        countDownLatch.countDown();
                    }
                });
            }
        } else {
            autoShutdownThreadExecutor.submit(new Runnable() { // from class: com.gs.fw.common.mithra.util.MultiThreadedBatchProcessor.2
                @Override // java.lang.Runnable
                public void run() {
                    MultiThreadedBatchProcessor.this.queueForDeepFetchAndProcessing(null, linkedBlockingQueue);
                    countDownLatch.countDown();
                }
            });
        }
        autoShutdownThreadExecutor.shutdownAndWaitUntilDone();
        if (autoShutdownThreadExecutor.isAborted()) {
            autoShutdownThreadExecutor2.shutdownNow();
            throw new RuntimeException("unrecoverable error while processing. See logs above");
        }
        autoShutdownThreadExecutor2.shutdownAndWaitUntilDone();
        if (autoShutdownThreadExecutor2.isAborted()) {
            throw new RuntimeException("unrecoverable error while processing. See logs above.");
        }
        LOGGER.info("Total read from DB: " + this.totalQueued);
    }

    protected long deepFetchBatch(TL tl) {
        long currentTimeMillis = System.currentTimeMillis();
        addDeepFetches(tl);
        tl.forceResolve();
        return System.currentTimeMillis() - currentTimeMillis;
    }

    protected void queueForDeepFetchAndProcessing(Object obj, LinkedBlockingQueue<TL> linkedBlockingQueue) {
        AtomicLong atomicLong = new AtomicLong();
        queueWithOp(obj, linkedBlockingQueue, atomicLong);
        LOGGER.info((obj != null ? "Source " + obj + " " : "") + "finished reading. " + atomicLong.get() + " queued for output.");
    }

    protected void queueWithOp(final Object obj, final LinkedBlockingQueue<TL> linkedBlockingQueue, final AtomicLong atomicLong) {
        Operation operation = this.mainOperation;
        if (obj != null) {
            operation = operation.and((com.gs.fw.finder.Operation) this.finderInstance.getSourceAttribute().nonPrimitiveEq(obj));
        }
        Operation operation2 = this.additionalPerShardRetrievalOperations.get(obj);
        if (operation2 != null) {
            operation = operation.and((com.gs.fw.finder.Operation) operation2);
        }
        final FastList newList = FastList.newList(this.batchSize);
        this.finderInstance.findMany((com.gs.fw.finder.Operation) operation).forEachWithCursor(new DoWhileProcedure() { // from class: com.gs.fw.common.mithra.util.MultiThreadedBatchProcessor.3
            @Override // com.gs.fw.common.mithra.util.DoWhileProcedure
            public boolean execute(Object obj2) {
                newList.add(obj2);
                if (newList.size() != MultiThreadedBatchProcessor.this.batchSize) {
                    return true;
                }
                MultiThreadedBatchProcessor.this.queueResultsWithoutDeepFetch(newList, linkedBlockingQueue, obj);
                atomicLong.addAndGet(newList.size());
                newList.clear();
                return true;
            }
        });
        if (newList.isEmpty()) {
            return;
        }
        queueResultsWithoutDeepFetch(newList, linkedBlockingQueue, obj);
        atomicLong.addAndGet(newList.size());
    }

    protected void queueResultsWithoutDeepFetch(List<T> list, LinkedBlockingQueue<TL> linkedBlockingQueue, Object obj) {
        MithraList<? extends T> constructEmptyList = this.finderInstance.constructEmptyList();
        constructEmptyList.addAll(list);
        try {
            linkedBlockingQueue.put(constructEmptyList);
            LOGGER.info("queued " + list.size() + (obj != null ? " for source " + obj : ""));
            this.totalQueued.addAndGet(list.size());
        } catch (InterruptedException e) {
            throw new RuntimeException("Unexpected exception", e);
        }
    }

    protected void addDeepFetches(TL tl) {
        for (int i = 0; i < this.deepFetches.size(); i++) {
            tl.deepFetch(this.deepFetches.get(i));
        }
    }
}
