/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.connector.mysql.AbstractReader;
import io.debezium.connector.mysql.Filters;
import io.debezium.connector.mysql.MySqlSchema;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.RecordMakers;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.BufferedBlockingConsumer;
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;

public class SnapshotReader
extends AbstractReader {
    private boolean minimalBlocking = true;
    private RecordRecorder recorder = this::recordRowAsRead;
    private volatile Thread thread;
    private volatile Runnable onSuccessfulCompletion;

    public SnapshotReader(MySqlTaskContext context) {
        super(context);
    }

    public SnapshotReader onSuccessfulCompletion(Runnable onSuccessfulCompletion) {
        this.onSuccessfulCompletion = onSuccessfulCompletion;
        return this;
    }

    public SnapshotReader useMinimalBlocking(boolean minimalBlocking) {
        this.minimalBlocking = minimalBlocking;
        return this;
    }

    public SnapshotReader generateReadEvents() {
        this.recorder = this::recordRowAsRead;
        return this;
    }

    public SnapshotReader generateInsertEvents() {
        this.recorder = this::recordRowAsInsert;
        return this;
    }

    @Override
    protected void doStart() {
        this.thread = new Thread(this::execute, "mysql-snapshot-" + this.context.serverName());
        this.thread.start();
    }

    @Override
    protected void doStop() {
        this.thread.interrupt();
    }

    @Override
    protected void doCleanup() {
        this.thread = null;
        this.logger.trace("Completed writing all snapshot records");
        try {
            if (this.onSuccessfulCompletion != null) {
                this.onSuccessfulCompletion.run();
            }
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Throwable e) {
            throw new ConnectException("Error calling completion function after completing snapshot", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void execute() {
        this.context.configureLoggingContext("snapshot");
        AtomicReference<String> sql = new AtomicReference<String>();
        JdbcConnection mysql = this.context.jdbc();
        MySqlSchema schema = this.context.dbSchema();
        Filters filters = schema.filters();
        SourceInfo source = this.context.source();
        Clock clock = this.context.clock();
        long ts = clock.currentTimeInMillis();
        this.logger.info("Starting snapshot for {} with user '{}'", (Object)this.context.connectionString(), (Object)mysql.username());
        this.logRolesForCurrentUser(mysql);
        this.logServerInformation(mysql);
        try {
            this.logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
            mysql.setAutoCommit(false);
            sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
            mysql.execute(new String[]{(String)sql.get()});
            Map<String, String> systemVariables = this.context.readMySqlCharsetSystemVariables(sql);
            String setSystemVariablesStatement = this.context.setStatementFor(systemVariables);
            this.logger.info("Step 1: start transaction with consistent snapshot");
            sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
            mysql.execute(new String[]{sql.get()});
            long lockAcquired = clock.currentTimeInMillis();
            this.logger.info("Step 2: flush and obtain global read lock (preventing writes to database)");
            sql.set("FLUSH TABLES WITH READ LOCK");
            mysql.execute(new String[]{sql.get()});
            this.logger.info("Step 3: read binlog position of MySQL master");
            String showMasterStmt = "SHOW MASTER STATUS";
            sql.set(showMasterStmt);
            mysql.query(sql.get(), rs -> {
                if (rs.next()) {
                    String binlogFilename = rs.getString(1);
                    long binlogPosition = rs.getLong(2);
                    source.setBinlogStartPoint(binlogFilename, binlogPosition);
                    if (rs.getMetaData().getColumnCount() > 4) {
                        String gtidSet = rs.getString(5);
                        source.setGtidSet(gtidSet);
                        this.logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", new Object[]{binlogFilename, binlogPosition, gtidSet});
                    } else {
                        this.logger.info("\t using binlog '{}' at position '{}'", (Object)binlogFilename, (Object)binlogPosition);
                    }
                } else {
                    throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt + "'. Make sure your server is correctly configured");
                }
                source.startSnapshot();
            });
            this.logger.info("Step 4: read list of available databases");
            ArrayList databaseNames = new ArrayList();
            sql.set("SHOW DATABASES");
            mysql.query(sql.get(), rs -> {
                while (rs.next()) {
                    databaseNames.add(rs.getString(1));
                }
            });
            this.logger.info("\t list of available databases is: {}", databaseNames);
            this.logger.info("Step 5: read list of available tables in each database");
            ArrayList tableIds = new ArrayList();
            HashMap tableIdsByDbName = new HashMap();
            for (Object dbName : databaseNames) {
                sql.set("SHOW TABLES IN " + (String)dbName);
                mysql.query(sql.get(), arg_0 -> this.lambda$execute$3((String)dbName, filters, tableIds, tableIdsByDbName, arg_0));
            }
            this.logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:");
            schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);
            HashSet allTableIds = new HashSet(schema.tables().tableIds());
            allTableIds.addAll(tableIds);
            allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, this::enqueueSchemaChanges));
            schema.tables().tableIds().stream().map(TableId::catalog).filter(Predicates.not(databaseNames::contains)).forEach(missingDbName -> schema.applyDdl(source, (String)missingDbName, "DROP DATABASE IF EXISTS " + missingDbName, this::enqueueSchemaChanges));
            for (Map.Entry entry : tableIdsByDbName.entrySet()) {
                String dbName = (String)entry.getKey();
                schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + dbName, this::enqueueSchemaChanges);
                schema.applyDdl(source, dbName, "CREATE DATABASE " + dbName, this::enqueueSchemaChanges);
                schema.applyDdl(source, dbName, "USE " + dbName, this::enqueueSchemaChanges);
                for (TableId tableId2 : (List)entry.getValue()) {
                    sql.set("SHOW CREATE TABLE " + tableId2);
                    mysql.query(sql.get(), rs -> {
                        if (rs.next()) {
                            schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
                        }
                    });
                }
            }
            this.context.makeRecord().regenerate();
            boolean unlocked = false;
            if (this.minimalBlocking) {
                this.logger.info("Step 7: releasing global read lock to enable MySQL writes");
                sql.set("UNLOCK TABLES");
                mysql.execute(new String[]{sql.get()});
                unlocked = true;
                long l = clock.currentTimeInMillis();
                this.logger.info("Step 7: blocked writes to MySQL for a total of {}", (Object)Strings.duration((long)(l - lockAcquired)));
            }
            BufferedBlockingConsumer bufferedBlockingConsumer = BufferedBlockingConsumer.bufferLast(x$0 -> super.enqueueRecord((SourceRecord)x$0));
            this.logger.info("Step 8: scanning contents of {} tables", (Object)tableIds.size());
            long startScan = clock.currentTimeInMillis();
            AtomicBoolean interrupted = new AtomicBoolean(false);
            AtomicLong totalRowCount = new AtomicLong();
            int counter = 0;
            int completedCounter = 0;
            long largeTableCount = this.context.rowCountForLargeTable();
            for (TableId tableId3 : tableIds) {
                RecordMakers.RecordsForTable recordMaker = this.context.makeRecord().forTable(tableId3, null, (BlockingConsumer<SourceRecord>)bufferedBlockingConsumer);
                if (recordMaker != null) {
                    sql.set("SELECT COUNT(*) FROM " + tableId3);
                    AtomicLong numRows = new AtomicLong();
                    mysql.query(sql.get(), rs -> {
                        if (rs.next()) {
                            numRows.set(rs.getLong(1));
                        }
                    });
                    JdbcConnection.StatementFactory statementFactory = this::createStatement;
                    if (numRows.get() > largeTableCount) {
                        statementFactory = this::createStatementWithLargeResultSet;
                    }
                    long start = clock.currentTimeInMillis();
                    this.logger.info("Step 8: - scanning table '{}' ({} of {} tables)", new Object[]{tableId3, ++counter, tableIds.size()});
                    sql.set("SELECT * FROM " + tableId3);
                    mysql.query(sql.get(), statementFactory, rs -> {
                        long rowNum = 0L;
                        long rowCount = numRows.get();
                        try {
                            Table table = schema.tableFor(tableId3);
                            int numColumns = table.columns().size();
                            Object[] row = new Object[numColumns];
                            while (rs.next()) {
                                int i = 0;
                                int j = 1;
                                while (i != numColumns) {
                                    row[i] = rs.getObject(j);
                                    ++i;
                                    ++j;
                                }
                                this.recorder.recordRow(recordMaker, row, ts);
                                if (++rowNum % 10000L != 0L && rowNum != rowCount) continue;
                                long stop = clock.currentTimeInMillis();
                                this.logger.info("Step 8: - {} of {} rows scanned from table '{}' after {}", new Object[]{rowNum, rowCount, tableId3, Strings.duration((long)(stop - start))});
                            }
                        }
                        catch (InterruptedException e) {
                            Thread.interrupted();
                            this.logger.info("Step 8: Stopping the snapshot due to thread interruption");
                            interrupted.set(true);
                        }
                        finally {
                            totalRowCount.addAndGet(rowCount);
                        }
                    });
                    if (interrupted.get()) break;
                }
                ++completedCounter;
            }
            source.markLastSnapshot();
            long stop = clock.currentTimeInMillis();
            try {
                bufferedBlockingConsumer.flush(this::replaceOffset);
                this.logger.info("Step 8: scanned {} rows in {} tables in {}", new Object[]{totalRowCount, tableIds.size(), Strings.duration((long)(stop - startScan))});
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.logger.info("Step 8: aborting the snapshot after {} rows in {} of {} tables {}", new Object[]{totalRowCount, completedCounter, tableIds.size(), Strings.duration((long)(stop - startScan))});
                interrupted.set(true);
            }
            int step = 9;
            if (!unlocked) {
                this.logger.info("Step {}: releasing global read lock to enable MySQL writes", (Object)step++);
                sql.set("UNLOCK TABLES");
                mysql.execute(new String[]{sql.get()});
                unlocked = true;
                long lockReleased = clock.currentTimeInMillis();
                this.logger.info("Writes to MySQL prevented for a total of {}", (Object)Strings.duration((long)(lockReleased - lockAcquired)));
            }
            if (interrupted.get()) {
                this.logger.info("Step {}: rolling back transaction after abort", (Object)step++);
                sql.set("ROLLBACK");
                mysql.execute(new String[]{sql.get()});
                return;
            }
            this.logger.info("Step {}: committing transaction", (Object)step++);
            sql.set("COMMIT");
            mysql.execute(new String[]{sql.get()});
            try {
                source.completeSnapshot();
            }
            finally {
                super.completeSuccessfully();
                stop = clock.currentTimeInMillis();
                this.logger.info("Completed snapshot in {}", (Object)Strings.duration((long)(stop - ts)));
            }
        }
        catch (Throwable e) {
            this.failed(e, "Aborting snapshot after running '" + (String)sql.get() + "': " + e.getMessage());
        }
    }

    private Statement createStatementWithLargeResultSet(Connection connection) throws SQLException {
        Statement stmt = connection.createStatement(1003, 1007);
        stmt.setFetchSize(Integer.MIN_VALUE);
        return stmt;
    }

    private Statement createStatement(Connection connection) throws SQLException {
        return connection.createStatement();
    }

    private void logServerInformation(JdbcConnection mysql) {
        try {
            this.logger.info("MySQL server variables related to change data capture:");
            mysql.query("SHOW VARIABLES WHERE Variable_name REGEXP 'version|binlog|tx_|gtid|character_set|collation'", rs -> {
                while (rs.next()) {
                    this.logger.info("\t{} = {}", (Object)Strings.pad((String)rs.getString(1), (int)45, (char)' '), (Object)Strings.pad((String)rs.getString(2), (int)45, (char)' '));
                }
            });
        }
        catch (SQLException e) {
            this.logger.info("Cannot determine MySql server version", (Throwable)e);
        }
    }

    private void logRolesForCurrentUser(JdbcConnection mysql) {
        try {
            ArrayList grants = new ArrayList();
            mysql.query("SHOW GRANTS FOR CURRENT_USER", rs -> {
                while (rs.next()) {
                    grants.add(rs.getString(1));
                }
            });
            if (grants.isEmpty()) {
                this.logger.warn("Snapshot is using user '{}' but it likely doesn't have proper privileges. If tables are missing or are empty, ensure connector is configured with the correct MySQL user and/or ensure that the MySQL user has the required privileges.", (Object)mysql.username());
            } else {
                this.logger.info("Snapshot is using user '{}' with these MySQL grants:", (Object)mysql.username());
                grants.forEach(grant -> this.logger.info("\t{}", grant));
            }
        }
        catch (SQLException e) {
            this.logger.info("Cannot determine the privileges for '{}' ", (Object)mysql.username(), (Object)e);
        }
    }

    protected SourceRecord replaceOffset(SourceRecord record) {
        if (record == null) {
            return null;
        }
        Map<String, ?> newOffset = this.context.source().offset();
        return new SourceRecord(record.sourcePartition(), newOffset, record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value());
    }

    protected void enqueueSchemaChanges(String dbName, String ddlStatement) {
        if (!this.context.includeSchemaChangeRecords() || ddlStatement.length() == 0) {
            return;
        }
        if (this.context.makeRecord().schemaChanges(dbName, ddlStatement, (BlockingConsumer<SourceRecord>)((BlockingConsumer)x$0 -> super.enqueueRecord((SourceRecord)x$0))) > 0) {
            this.logger.info("\t{}", (Object)ddlStatement);
        }
    }

    protected void recordRowAsRead(RecordMakers.RecordsForTable recordMaker, Object[] row, long ts) throws InterruptedException {
        recordMaker.read(row, ts);
    }

    protected void recordRowAsInsert(RecordMakers.RecordsForTable recordMaker, Object[] row, long ts) throws InterruptedException {
        recordMaker.create(row, ts);
    }

    private /* synthetic */ void lambda$execute$3(String dbName, Filters filters, List tableIds, Map tableIdsByDbName, ResultSet rs) throws SQLException {
        while (rs.next()) {
            TableId id = new TableId(dbName, null, rs.getString(1));
            if (filters.tableFilter().test(id)) {
                tableIds.add(id);
                tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList()).add(id);
                this.logger.info("\t including '{}'", (Object)id);
                continue;
            }
            this.logger.info("\t '{}' is filtered out, discarding", (Object)id);
        }
    }

    protected static interface RecordRecorder {
        public void recordRow(RecordMakers.RecordsForTable var1, Object[] var2, long var3) throws InterruptedException;
    }
}

