package org.apache.omid.tso;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.Histogram;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.Timer;
import org.apache.omid.tso.PersistEvent;
import org.apache.omid.tso.PersistenceProcessorImpl;
import org.apache.phoenix.shaded.com.codahale.metrics.MetricRegistry;
import org.apache.phoenix.shaded.com.lmax.disruptor.WorkHandler;
import org.apache.phoenix.shaded.javax.inject.Inject;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/omid/tso/PersistenceProcessorHandler.class */
public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PersistenceProcessorHandler.class);

    @VisibleForTesting
    static final AtomicInteger consecutiveSequenceCreator = new AtomicInteger(0);
    private final String id = String.valueOf(consecutiveSequenceCreator.getAndIncrement());
    private final String tsoHostAndPort;
    private final LeaseManagement leaseManager;
    private final ReplyProcessor replyProcessor;
    private final RetryProcessor retryProcessor;
    private final CommitTable.Writer writer;
    final Panicker panicker;
    private final Timer flushTimer;
    private final Histogram batchSizeHistogram;
    private final Histogram flushedCommitEventsHistogram;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Inject
    public PersistenceProcessorHandler(MetricsRegistry metricsRegistry, String str, LeaseManagement leaseManagement, CommitTable commitTable, ReplyProcessor replyProcessor, RetryProcessor retryProcessor, Panicker panicker) throws InterruptedException, ExecutionException, IOException {
        this.tsoHostAndPort = str;
        this.leaseManager = leaseManagement;
        this.writer = commitTable.getWriter();
        this.replyProcessor = replyProcessor;
        this.retryProcessor = retryProcessor;
        this.panicker = panicker;
        this.flushTimer = metricsRegistry.timer(MetricRegistry.name("tso", "persistence-processor-handler", this.id, "flush", "latency"));
        this.flushedCommitEventsHistogram = metricsRegistry.histogram(MetricRegistry.name("tso", "persistence-processor-handler", this.id, "flushed", "commits", "size"));
        this.batchSizeHistogram = metricsRegistry.histogram(MetricRegistry.name("tso", "persistence-processor-handler", this.id, "batch", "size"));
    }

    public String getId() {
        return this.id;
    }

    @Override // org.apache.phoenix.shaded.com.lmax.disruptor.WorkHandler
    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent persistBatchEvent) throws Exception {
        int i = 0;
        Batch batch = persistBatchEvent.getBatch();
        int numEvents = batch.getNumEvents();
        this.batchSizeHistogram.update(numEvents);
        for (int i2 = 0; i2 < numEvents; i2++) {
            PersistEvent persistEvent = batch.get(i2);
            switch (persistEvent.getType()) {
                case TIMESTAMP:
                    persistEvent.getMonCtx().timerStop("persistence.processor.timestamp.latency");
                    break;
                case COMMIT:
                    this.writer.addCommittedTransaction(persistEvent.getStartTimestamp(), persistEvent.getCommitTimestamp());
                    i++;
                    break;
                case COMMIT_RETRY:
                    persistEvent.getMonCtx().timerStop("persistence.processor.commit-retry.latency");
                    break;
                case ABORT:
                    persistEvent.getMonCtx().timerStop("persistence.processor.abort.latency");
                    break;
                case FENCE:
                    this.writer.addCommittedTransaction(persistEvent.getCommitTimestamp(), persistEvent.getCommitTimestamp());
                    i++;
                    break;
                default:
                    throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + persistEvent);
            }
        }
        flush(i);
        filterAndDissambiguateClientRetries(batch);
        for (int i3 = 0; i3 < batch.getNumEvents(); i3++) {
            PersistEvent persistEvent2 = batch.get(i3);
            switch (persistEvent2.getType()) {
                case TIMESTAMP:
                    persistEvent2.getMonCtx().timerStart("reply.processor.timestamp.latency");
                    break;
                case COMMIT:
                    persistEvent2.getMonCtx().timerStop("persistence.processor.commit.latency");
                    persistEvent2.getMonCtx().timerStart("reply.processor.commit.latency");
                    break;
                case COMMIT_RETRY:
                    throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + persistEvent2);
                case ABORT:
                    persistEvent2.getMonCtx().timerStart("reply.processor.abort.latency");
                    break;
                case FENCE:
                    persistEvent2.getMonCtx().timerStop("persistence.processor.fence.latency");
                    persistEvent2.getMonCtx().timerStart("reply.processor.fence.latency");
                    break;
                default:
                    throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + persistEvent2);
            }
        }
        this.replyProcessor.manageResponsesBatch(persistBatchEvent.getBatchSequence(), batch);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush(int i) {
        commitSuicideIfNotMaster();
        try {
            long nanoTime = System.nanoTime();
            if (i > 0) {
                this.writer.flush();
            }
            this.flushTimer.update(System.nanoTime() - nanoTime);
            this.flushedCommitEventsHistogram.update(i);
        } catch (IOException e) {
            this.panicker.panic("Error persisting commit batch", e);
        }
        commitSuicideIfNotMaster();
    }

    private void commitSuicideIfNotMaster() {
        if (this.leaseManager.stillInLeasePeriod()) {
            return;
        }
        this.panicker.panic("Replica " + this.tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void filterAndDissambiguateClientRetries(Batch batch) {
        int i = 0;
        while (i <= batch.getLastEventIdx()) {
            PersistEvent persistEvent = batch.get(i);
            if (persistEvent.getType() == PersistEvent.Type.COMMIT_RETRY) {
                this.retryProcessor.disambiguateRetryRequestHeuristically(persistEvent.getStartTimestamp(), persistEvent.getChannel(), persistEvent.getMonCtx());
                swapBatchElements(batch, i, batch.getLastEventIdx());
                batch.decreaseNumEvents();
                if (batch.isEmpty()) {
                    return;
                }
            } else {
                i++;
            }
        }
    }

    private void swapBatchElements(Batch batch, int i, int i2) {
        PersistEvent persistEvent = batch.get(i);
        batch.set(i, batch.get(i2));
        batch.set(i2, persistEvent);
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("id", this.id).toString();
    }
}
