/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlSnapshotChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.io.UnsupportedEncodingException;
import java.sql.Blob;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSnapshotChangeEventSource
extends RelationalSnapshotChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class);
    private final MySqlConnectorConfig connectorConfig;
    private final MySqlConnection connection;
    private long globalLockAcquiredAt = -1L;
    private final RelationalTableFilters filters;
    private final MySqlSnapshotChangeEventSourceMetrics metrics;
    private final MySqlOffsetContext previousOffset;
    private final MySqlDatabaseSchema databaseSchema;

    public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlOffsetContext previousOffset, MySqlConnection connection, MySqlDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock, MySqlSnapshotChangeEventSourceMetrics metrics) {
        super((RelationalDatabaseConnectorConfig)connectorConfig, (OffsetContext)previousOffset, (JdbcConnection)connection, (HistorizedRelationalDatabaseSchema)schema, dispatcher, clock, (SnapshotProgressListener)metrics);
        this.connectorConfig = connectorConfig;
        this.connection = connection;
        this.filters = connectorConfig.getTableFilters();
        this.metrics = metrics;
        this.previousOffset = previousOffset;
        this.databaseSchema = schema;
    }

    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
        boolean snapshotSchema = true;
        boolean snapshotData = true;
        if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
            LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
            snapshotSchema = false;
            snapshotData = false;
        } else {
            LOGGER.info("No previous offset has been found");
            if (this.connectorConfig.getSnapshotMode().includeData()) {
                LOGGER.info("According to the connector configuration both schema and data will be snapshotted");
            } else {
                LOGGER.info("According to the connector configuration only schema will be snapshotted");
            }
            snapshotData = this.connectorConfig.getSnapshotMode().includeData();
            snapshotSchema = this.connectorConfig.getSnapshotMode().includeSchema();
        }
        return new AbstractSnapshotChangeEventSource.SnapshottingTask(snapshotSchema, snapshotData);
    }

    protected AbstractSnapshotChangeEventSource.SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext context) throws Exception {
        return new MySqlSnapshotContext();
    }

    protected void connectionCreated(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws Exception {
    }

    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx) throws Exception {
        LOGGER.info("Read list of available databases");
        ArrayList databaseNames = new ArrayList();
        this.connection.query("SHOW DATABASES", rs -> {
            while (rs.next()) {
                databaseNames.add(rs.getString(1));
            }
        });
        LOGGER.info("\t list of available databases is: {}", databaseNames);
        LOGGER.info("Read list of available tables in each database");
        HashSet<TableId> tableIds = new HashSet<TableId>();
        HashSet<String> readableDatabaseNames = new HashSet<String>();
        for (String dbName : databaseNames) {
            try {
                this.connection.query("SHOW FULL TABLES IN " + this.quote(dbName) + " where Table_Type = 'BASE TABLE'", rs -> {
                    while (rs.next()) {
                        TableId id = new TableId(dbName, null, rs.getString(1));
                        tableIds.add(id);
                    }
                });
                readableDatabaseNames.add(dbName);
            }
            catch (SQLException e) {
                LOGGER.warn("\t skipping database '{}' due to error reading tables: {}", (Object)dbName, (Object)e.getMessage());
            }
        }
        Set includedDatabaseNames = readableDatabaseNames.stream().filter(this.filters.databaseFilter()).collect(Collectors.toSet());
        LOGGER.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);
        return tableIds;
    }

    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException, InterruptedException {
        this.connection.connection().setTransactionIsolation(4);
        this.connection.executeWithoutCommitting(new String[]{"SET SESSION lock_wait_timeout=" + this.connectorConfig.snapshotLockTimeout().getSeconds()});
        try {
            this.connection.executeWithoutCommitting(new String[]{"SET SESSION innodb_lock_wait_timeout=" + this.connectorConfig.snapshotLockTimeout().getSeconds()});
        }
        catch (SQLException e) {
            LOGGER.warn("Unable to set innodb_lock_wait_timeout", (Throwable)e);
        }
        if (this.connectorConfig.getSnapshotLockingMode() != MySqlConnectorConfig.SnapshotLockingMode.NONE) {
            block5: {
                try {
                    this.globalLock();
                    this.metrics.globalLockAcquired();
                }
                catch (SQLException e) {
                    LOGGER.info("Unable to flush and acquire global read lock, will use table read locks after reading table names");
                    if ($assertionsDisabled || !this.isGloballyLocked()) break block5;
                    throw new AssertionError();
                }
            }
            this.connection.executeWithoutCommitting(new String[]{"SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"});
        }
    }

    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException {
        if (this.connectorConfig.getSnapshotLockingMode() == MySqlConnectorConfig.SnapshotLockingMode.MINIMAL && this.isGloballyLocked()) {
            this.globalUnlock();
        }
    }

    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx) throws Exception {
        if (this.previousOffset != null) {
            ctx.offset = this.previousOffset;
            this.tryStartingSnapshot(ctx);
            return;
        }
        MySqlOffsetContext offsetContext = MySqlOffsetContext.initial(this.connectorConfig);
        ctx.offset = offsetContext;
        LOGGER.info("Read binlog position of MySQL primary server");
        String showMasterStmt = "SHOW MASTER STATUS";
        this.connection.query("SHOW MASTER STATUS", rs -> {
            if (rs.next()) {
                String binlogFilename = rs.getString(1);
                long binlogPosition = rs.getLong(2);
                offsetContext.getSource().setBinlogStartPoint(binlogFilename, binlogPosition);
                if (rs.getMetaData().getColumnCount() > 4) {
                    String gtidSet = rs.getString(5);
                    offsetContext.getSource().setCompletedGtidSet(gtidSet);
                    LOGGER.info("\t using binlog '{}' at position '{}' and gtid '{}'", new Object[]{binlogFilename, binlogPosition, gtidSet});
                } else {
                    LOGGER.info("\t using binlog '{}' at position '{}'", (Object)binlogFilename, (Object)binlogPosition);
                }
            } else {
                throw new DebeziumException("Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured");
            }
        });
        this.tryStartingSnapshot(ctx);
    }

    private void addRawSchemaEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, String database, String ddl) {
        this.addRawSchemaChangeEvent(new SchemaChangeEvent(snapshotContext.offset.getPartition(), snapshotContext.offset.getOffset(), snapshotContext.offset.getSourceInfo(), database, null, ddl, true));
    }

    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext) throws SQLException, InterruptedException {
        Set capturedSchemaTables;
        if (this.databaseSchema.storeOnlyMonitoredTables()) {
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Only monitored tables schema should be captured, capturing: {}", (Object)capturedSchemaTables);
        } else {
            capturedSchemaTables = snapshotContext.capturedSchemaTables;
            LOGGER.info("All eligible tables schema should be captured, capturing: {}", (Object)capturedSchemaTables);
        }
        Map tablesToRead = capturedSchemaTables.stream().collect(Collectors.groupingBy(TableId::catalog, LinkedHashMap::new, Collectors.toList()));
        Set databases = tablesToRead.keySet();
        this.addRawSchemaEvent(snapshotContext, "", this.connection.setStatementFor(this.connection.readMySqlCharsetSystemVariables()));
        for (TableId tableId : capturedSchemaTables) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while emitting initial DROP TABLE events");
            }
            this.addRawSchemaEvent(snapshotContext, tableId.catalog(), "DROP TABLE IF EXISTS " + this.quote(tableId));
        }
        Map<String, MySqlJdbcContext.DatabaseLocales> databaseCharsets = this.connection.readDatabaseCollations();
        for (String database : databases) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + databases);
            }
            LOGGER.info("Reading structure of database '{}'", (Object)database);
            this.addRawSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + this.quote(database));
            StringBuilder createDatabaseDddl = new StringBuilder("CREATE DATABASE " + this.quote(database));
            MySqlJdbcContext.DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
            if (defaultDatabaseLocales != null) {
                defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDddl);
            }
            this.addRawSchemaEvent(snapshotContext, database, createDatabaseDddl.toString());
            this.addRawSchemaEvent(snapshotContext, database, "USE " + this.quote(database));
            for (TableId tableId : (List)tablesToRead.get(database)) {
                this.connection.query("SHOW CREATE TABLE " + this.quote(tableId), rs -> {
                    if (rs.next()) {
                        this.addRawSchemaEvent(snapshotContext, database, rs.getString(2));
                    }
                });
            }
        }
    }

    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, Table table) throws SQLException {
        return new SchemaChangeEvent(snapshotContext.offset.getPartition(), snapshotContext.offset.getOffset(), snapshotContext.offset.getSourceInfo(), snapshotContext.catalogName, table.id().schema(), null, table, SchemaChangeEvent.SchemaChangeEventType.CREATE, true);
    }

    protected void complete(AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext) {
    }

    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, TableId tableId) {
        return Optional.of(String.format("SELECT * FROM `%s`.`%s`", tableId.catalog(), tableId.table()));
    }

    protected Object getColumnValue(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
        if (column.jdbcType() == 92) {
            return this.readTimeField(rs, columnIndex);
        }
        if (column.jdbcType() == 91) {
            return this.readDateField(rs, columnIndex, column, table);
        }
        if (column.jdbcType() == 93) {
            return this.readTimestampField(rs, columnIndex, column, table);
        }
        if (column.jdbcType() == -6 || column.jdbcType() == 5) {
            return rs.getObject(columnIndex) == null ? null : Integer.valueOf(rs.getInt(columnIndex));
        }
        if ("CHAR".equals(column.typeName()) || "VARCHAR".equals(column.typeName()) || "TEXT".equals(column.typeName())) {
            return rs.getBytes(columnIndex);
        }
        return rs.getObject(columnIndex);
    }

    private Object readTimeField(ResultSet rs, int fieldNo) throws SQLException {
        Blob b = rs.getBlob(fieldNo);
        if (b == null) {
            return null;
        }
        try {
            return MySqlValueConverters.stringToDuration(new String(b.getBytes(1L, (int)b.length()), "UTF-8"));
        }
        catch (UnsupportedEncodingException e) {
            LOGGER.error("Could not read MySQL TIME value as UTF-8");
            throw new RuntimeException(e);
        }
    }

    private Object readDateField(ResultSet rs, int fieldNo, Column column, Table table) throws SQLException {
        Blob b = rs.getBlob(fieldNo);
        if (b == null) {
            return null;
        }
        try {
            return MySqlValueConverters.stringToLocalDate(new String(b.getBytes(1L, (int)b.length()), "UTF-8"), column, table);
        }
        catch (UnsupportedEncodingException e) {
            LOGGER.error("Could not read MySQL TIME value as UTF-8");
            throw new RuntimeException(e);
        }
    }

    private Object readTimestampField(ResultSet rs, int fieldNo, Column column, Table table) throws SQLException {
        Blob b = rs.getBlob(fieldNo);
        if (b == null) {
            return null;
        }
        try {
            return MySqlValueConverters.containsZeroValuesInDatePart(new String(b.getBytes(1L, (int)b.length()), "UTF-8"), column, table) ? null : rs.getTimestamp(fieldNo, Calendar.getInstance());
        }
        catch (UnsupportedEncodingException e) {
            LOGGER.error("Could not read MySQL TIME value as UTF-8");
            throw new RuntimeException(e);
        }
    }

    private boolean isGloballyLocked() {
        return this.globalLockAcquiredAt != -1L;
    }

    private void globalUnlock() throws SQLException {
        LOGGER.info("Releasing global read lock to enable MySQL writes");
        this.connection.executeWithoutCommitting(new String[]{"UNLOCK TABLES"});
        long lockReleased = this.clock.currentTimeInMillis();
        this.metrics.globalLockReleased();
        LOGGER.info("Writes to MySQL tables prevented for a total of {}", (Object)Strings.duration((long)(lockReleased - this.globalLockAcquiredAt)));
        this.globalLockAcquiredAt = -1L;
    }

    private void globalLock() throws SQLException {
        LOGGER.info("Flush and obtain global read lock to prevent writes to database");
        this.connection.executeWithoutCommitting(new String[]{"FLUSH TABLES WITH READ LOCK"});
        this.globalLockAcquiredAt = this.clock.currentTimeInMillis();
    }

    private String quote(String dbOrTableName) {
        return "`" + dbOrTableName + "`";
    }

    private String quote(TableId id) {
        return this.quote(id.catalog()) + "." + this.quote(id.table());
    }

    protected OptionalLong rowCountForTable(TableId tableId) {
        return this.connection.getEstimatedTableSize(tableId);
    }

    protected Statement readTableStatement(OptionalLong rowCount) throws SQLException {
        long largeTableRowCount = this.connectorConfig.rowCountForLargeTable();
        if (!rowCount.isPresent() || largeTableRowCount == 0L || rowCount.getAsLong() <= largeTableRowCount) {
            return super.readTableStatement(rowCount);
        }
        return this.createStatementWithLargeResultSet();
    }

    private Statement createStatementWithLargeResultSet() throws SQLException {
        int fetchSize = this.connectorConfig.getSnapshotFetchSize();
        Statement stmt = this.connection.connection().createStatement(1003, 1007);
        stmt.setFetchSize(fetchSize);
        return stmt;
    }

    private static class MySqlSnapshotContext
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
        public MySqlSnapshotContext() throws SQLException {
            super("");
        }
    }
}

