package com.zendesk.maxwell.replication;

import com.codahale.metrics.Timer;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.zendesk.maxwell.monitoring.Metrics;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zendesk/maxwell/replication/BinlogConnectorEventListener.class */
class BinlogConnectorEventListener implements BinaryLogClient.EventListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(BinlogConnectorEventListener.class);
    private final BlockingQueue<BinlogConnectorEvent> queue;
    private final Timer queueTimer;
    protected final AtomicBoolean mustStop = new AtomicBoolean(false);
    private final BinaryLogClient client;
    private long replicationLag;
    private String gtid;

    public BinlogConnectorEventListener(BinaryLogClient binaryLogClient, BlockingQueue<BinlogConnectorEvent> blockingQueue, Metrics metrics) {
        this.client = binaryLogClient;
        this.queue = blockingQueue;
        this.queueTimer = metrics.getRegistry().timer(metrics.metricName("replication", "queue", "time"));
        metrics.register(metrics.metricName("replication", "lag"), () -> {
            return Long.valueOf(this.replicationLag);
        });
    }

    public void stop() {
        this.mustStop.set(true);
    }

    public void onEvent(Event event) {
        long j = 0;
        boolean z = false;
        if (event.getHeader().getEventType() == EventType.GTID) {
            this.gtid = event.getData().getGtid();
        }
        BinlogConnectorEvent binlogConnectorEvent = new BinlogConnectorEvent(event, this.client.getBinlogFilename(), this.client.getGtidSet(), this.gtid);
        if (binlogConnectorEvent.isCommitEvent()) {
            z = true;
            j = System.currentTimeMillis();
            this.replicationLag = j - event.getHeader().getTimestamp();
        }
        while (!this.mustStop.get()) {
            try {
                if (this.queue.offer(binlogConnectorEvent, 100L, TimeUnit.MILLISECONDS)) {
                    break;
                }
            } catch (InterruptedException e) {
                return;
            }
        }
        if (z) {
            this.queueTimer.update(System.currentTimeMillis() - j, TimeUnit.MILLISECONDS);
        }
    }
}
