/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.network.push;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.reactivex.mantis.network.push.ChunkProcessor;
import io.reactivex.mantis.network.push.ConnectionManager;
import io.reactivex.mantis.network.push.MonitoredQueue;
import io.reactivex.mantis.network.push.NamedThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class TimedChunker<T>
implements Callable<Void> {
    private static ThreadFactory namedFactory = new NamedThreadFactory("TimedChunkerGroup");
    private MonitoredQueue<T> buffer;
    private ChunkProcessor<T> processor;
    private ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(namedFactory);
    private int maxBufferLength;
    private int maxTimeMSec;
    private ConnectionManager<T> connectionManager;
    private List<T> internalBuffer;
    private Counter interrupted;
    private Counter numEventsDrained;

    public TimedChunker(MonitoredQueue<T> buffer, int maxBufferLength, int maxTimeMSec, ChunkProcessor<T> processor, ConnectionManager<T> connectionManager) {
        this.maxBufferLength = maxBufferLength;
        this.maxTimeMSec = maxTimeMSec;
        this.buffer = buffer;
        this.processor = processor;
        this.connectionManager = connectionManager;
        this.internalBuffer = new ArrayList<T>(maxBufferLength);
        MetricGroupId metricsGroup = new MetricGroupId("TimedChunker");
        Metrics metrics = new Metrics.Builder().id(metricsGroup).addCounter("interrupted").addCounter("numEventsDrained").build();
        this.interrupted = metrics.getCounter("interrupted");
        this.numEventsDrained = metrics.getCounter("numEventsDrained");
        MetricsRegistry.getInstance().registerAndGet(metrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Void call() throws Exception {
        ScheduledFuture<?> periodicDrain = this.scheduledService.scheduleAtFixedRate(this::drain, this.maxTimeMSec, this.maxTimeMSec, TimeUnit.MILLISECONDS);
        while (!this.stopCondition()) {
            try {
                T data = this.buffer.get();
                List<T> list = this.internalBuffer;
                synchronized (list) {
                    this.internalBuffer.add(data);
                }
                if (this.internalBuffer.size() < this.maxBufferLength) continue;
                this.drain();
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                periodicDrain.cancel(true);
                this.interrupted.increment();
            }
        }
        this.drain();
        return null;
    }

    private boolean stopCondition() {
        return Thread.currentThread().isInterrupted();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drain() {
        if (this.internalBuffer.size() > 0) {
            ArrayList<T> copy = new ArrayList<T>(this.internalBuffer.size());
            List<T> list = this.internalBuffer;
            synchronized (list) {
                copy.addAll(this.internalBuffer);
                this.internalBuffer.clear();
            }
            if (copy.size() > 0) {
                this.processor.process(this.connectionManager, copy);
                this.numEventsDrained.increment((long)copy.size());
            }
        }
    }
}

