package com.networknt.eventuate.cdc.mysql.binlog;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.NullEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer;
import com.networknt.eventuate.server.common.BinLogEvent;
import com.networknt.eventuate.server.common.BinlogFileOffset;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/eventuate/cdc/mysql/binlog/MySqlBinaryLogClient.class */
public class MySqlBinaryLogClient<M extends BinLogEvent> {
    private String name;
    private BinaryLogClient client;
    private long binlogClientUniqueId;
    private final String dbUserName;
    private final String dbPassword;
    private final String host;
    private final int port;
    private final IWriteRowsEventDataParser<M> writeRowsEventDataParser;
    private final String sourceTableName;
    private String binlogFilename;
    private long offset;
    private final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap();
    private Logger logger = LoggerFactory.getLogger(getClass());

    public MySqlBinaryLogClient(IWriteRowsEventDataParser<M> iWriteRowsEventDataParser, String str, String str2, String str3, int i, long j, String str4, String str5) {
        this.writeRowsEventDataParser = iWriteRowsEventDataParser;
        this.binlogClientUniqueId = j;
        this.dbUserName = str;
        this.dbPassword = str2;
        this.host = str3;
        this.port = i;
        this.sourceTableName = str4;
        this.name = str5;
    }

    public void start(Optional<BinlogFileOffset> optional, Consumer<M> consumer) throws IOException, TimeoutException {
        this.client = new BinaryLogClient(this.host, this.port, this.dbUserName, this.dbPassword);
        this.client.setServerId(this.binlogClientUniqueId);
        this.client.setKeepAliveInterval(AbstractComponentTracker.LINGERING_TIMEOUT);
        BinlogFileOffset orElse = optional.orElse(new BinlogFileOffset("", 4L));
        this.logger.debug("Starting with {}", orElse);
        this.client.setBinlogFilename(orElse.getBinlogFilename());
        this.client.setBinlogPosition(orElse.getOffset());
        this.client.setEventDeserializer(getEventDeserializer());
        this.client.registerEventListener(event -> {
            switch (event.getHeader().getEventType()) {
                case TABLE_MAP:
                    TableMapEventData tableMapEventData = (TableMapEventData) event.getData();
                    if (tableMapEventData.getTable().equalsIgnoreCase(this.sourceTableName)) {
                        this.tableMapEventByTableId.put(Long.valueOf(tableMapEventData.getTableId()), tableMapEventData);
                        return;
                    }
                    return;
                case EXT_WRITE_ROWS:
                    this.logger.debug("Got binlog event {}", event);
                    this.offset = ((EventHeaderV4) event.getHeader()).getPosition();
                    WriteRowsEventData writeRowsEventData = (WriteRowsEventData) event.getData();
                    if (this.tableMapEventByTableId.containsKey(Long.valueOf(writeRowsEventData.getTableId()))) {
                        try {
                            consumer.accept(this.writeRowsEventDataParser.parseEventData(writeRowsEventData, getCurrentBinlogFilename(), this.offset));
                            return;
                        } catch (IOException e) {
                            throw new RuntimeException("Event row parsing exception", e);
                        }
                    }
                    return;
                case ROTATE:
                    RotateEventData rotateEventData = (RotateEventData) event.getData();
                    if (rotateEventData != null) {
                        this.binlogFilename = rotateEventData.getBinlogFilename();
                        return;
                    }
                    return;
                default:
                    return;
            }
        });
        for (int i = 1; i < 5; i++) {
            try {
                this.client.connect(AbstractComponentTracker.LINGERING_TIMEOUT);
                return;
            } catch (Exception e) {
                this.logger.error("mysql connection error:" + e);
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e2) {
                    this.logger.error(e2.getMessage(), (Throwable) e2);
                }
            }
        }
    }

    private EventDeserializer getEventDeserializer() {
        EventDeserializer eventDeserializer = new EventDeserializer();
        Arrays.stream(EventType.values()).forEach(eventType -> {
            if (eventType == EventType.EXT_WRITE_ROWS || eventType == EventType.TABLE_MAP || eventType == EventType.ROTATE) {
                return;
            }
            eventDeserializer.setEventDataDeserializer(eventType, new NullEventDataDeserializer());
        });
        eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new WriteRowsEventDataDeserializer(this.tableMapEventByTableId).setMayContainExtraInformation(true));
        return eventDeserializer;
    }

    public void stop() {
        try {
            if (this.client != null) {
                this.client.disconnect();
            }
        } catch (IOException e) {
            this.logger.error("Cannot stop the MySqlBinaryLogClient", (Throwable) e);
        }
    }

    public String getCurrentBinlogFilename() {
        return this.binlogFilename;
    }

    public long getCurrentOffset() {
        return this.offset;
    }

    public String getName() {
        return this.name;
    }
}
