/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.relational;

import io.debezium.config.ConfigurationDefaults;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.Column;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HistorizedRelationalSnapshotChangeEventSource
implements SnapshotChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(HistorizedRelationalSnapshotChangeEventSource.class);
    private static final Duration LOG_INTERVAL = Duration.ofMillis(10000L);
    private final RelationalDatabaseConnectorConfig connectorConfig;
    private final OffsetContext previousOffset;
    private final JdbcConnection jdbcConnection;
    private final HistorizedRelationalDatabaseSchema schema;
    private final EventDispatcher<TableId> dispatcher;
    private final Clock clock;
    private final SnapshotProgressListener snapshotProgressListener;

    public HistorizedRelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig connectorConfig, OffsetContext previousOffset, JdbcConnection jdbcConnection, HistorizedRelationalDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) {
        this.connectorConfig = connectorConfig;
        this.previousOffset = previousOffset;
        this.jdbcConnection = jdbcConnection;
        this.schema = schema;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
    }

    /*
     * Exception decompiling
     */
    @Override
    public SnapshotResult execute(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[TRYBLOCK]], but top level block is 1[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    protected abstract SnapshottingTask getSnapshottingTask(OffsetContext var1);

    private void delaySnapshotIfNeeded(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        Duration snapshotDelay = this.connectorConfig.getSnapshotDelay();
        if (snapshotDelay.isZero() || snapshotDelay.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, snapshotDelay);
        Metronome metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        while (!timer.expired()) {
            if (!context.isRunning()) {
                throw new InterruptedException("Interrupted while awaiting initial snapshot delay");
            }
            LOGGER.info("The connector will wait for {}s before proceeding", (Object)timer.remaining().getSeconds());
            metronome.pause();
        }
    }

    protected abstract SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext var1) throws Exception;

    protected void connectionCreated(SnapshotContext snapshotContext) throws Exception {
    }

    private void determineCapturedTables(SnapshotContext ctx) throws Exception {
        Set<TableId> allTableIds = this.getAllTableIds(ctx);
        HashSet<TableId> capturedTables = new HashSet<TableId>();
        for (TableId tableId : allTableIds) {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables", (Object)tableId);
                capturedTables.add(tableId);
                continue;
            }
            LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", (Object)tableId);
        }
        ctx.capturedTables = capturedTables;
    }

    protected abstract Set<TableId> getAllTableIds(SnapshotContext var1) throws Exception;

    protected abstract void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext var1, SnapshotContext var2) throws Exception;

    protected abstract void determineSnapshotOffset(SnapshotContext var1) throws Exception;

    protected abstract void readTableStructure(ChangeEventSource.ChangeEventSourceContext var1, SnapshotContext var2) throws Exception;

    protected abstract void releaseSchemaSnapshotLocks(SnapshotContext var1) throws Exception;

    private void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws Exception {
        for (TableId tableId : snapshotContext.capturedTables) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while capturing schema of table " + tableId);
            }
            LOGGER.debug("Capturing structure of table {}", (Object)tableId);
            Table table = snapshotContext.tables.forTable(tableId);
            this.schema.applySchemaChange(this.getCreateTableEvent(snapshotContext, table));
        }
    }

    protected abstract SchemaChangeEvent getCreateTableEvent(SnapshotContext var1, Table var2) throws Exception;

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws InterruptedException {
        EventDispatcher.SnapshotReceiver snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        snapshotContext.offset.preSnapshotStart();
        for (TableId tableId : snapshotContext.capturedTables) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while snapshotting table " + tableId);
            }
            LOGGER.debug("Snapshotting table {}", (Object)tableId);
            this.createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId));
        }
        snapshotContext.offset.preSnapshotCompletion();
        snapshotReceiver.completeSnapshot();
        snapshotContext.offset.postSnapshotCompletion();
    }

    private void createDataEventsForTable(ChangeEventSource.ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException {
        long exportStart = this.clock.currentTimeInMillis();
        LOGGER.info("\t Exporting data from table '{}'", (Object)table.id());
        String selectStatement = this.getSnapshotSelect(snapshotContext, table.id());
        LOGGER.info("\t For table '{}' using select statement: '{}'", (Object)table.id(), (Object)selectStatement);
        try (Statement statement = this.readTableStatement();
             ResultSet rs = statement.executeQuery(selectStatement);){
            Column[] columns = this.getColumnsForResultSet(table, rs);
            int numColumns = table.columns().size();
            long rows = 0L;
            Threads.Timer logTimer = this.getTableScanLogTimer();
            while (rs.next()) {
                if (!sourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while snapshotting table " + table.id());
                }
                ++rows;
                Object[] row = new Object[numColumns];
                for (int i = 0; i < numColumns; ++i) {
                    row[i] = this.getColumnValue(rs, i + 1, columns[i]);
                }
                if (logTimer.expired()) {
                    long stop = this.clock.currentTimeInMillis();
                    LOGGER.info("\t Exported {} records for table '{}' after {}", new Object[]{rows, table.id(), Strings.duration(stop - exportStart)});
                    this.snapshotProgressListener.rowsScanned(table.id(), rows);
                    logTimer = this.getTableScanLogTimer();
                }
                this.dispatcher.dispatchSnapshotEvent(table.id(), this.getChangeRecordEmitter(snapshotContext, row), snapshotReceiver);
            }
            LOGGER.info("\t Finished exporting {} records for table '{}'; total duration '{}'", new Object[]{rows, table.id(), Strings.duration(this.clock.currentTimeInMillis() - exportStart)});
            this.snapshotProgressListener.tableSnapshotCompleted(table.id(), rows);
        }
        catch (SQLException e) {
            throw new ConnectException("Snapshotting of table " + table.id() + " failed", (Throwable)e);
        }
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer(this.clock, LOG_INTERVAL);
    }

    protected abstract ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext var1, Object[] var2);

    protected abstract String getSnapshotSelect(SnapshotContext var1, TableId var2);

    private Column[] getColumnsForResultSet(Table table, ResultSet rs) throws SQLException {
        ResultSetMetaData metaData = rs.getMetaData();
        Column[] columns = new Column[metaData.getColumnCount()];
        for (int i = 0; i < columns.length; ++i) {
            columns[i] = table.columnWithName(metaData.getColumnName(i + 1));
        }
        return columns;
    }

    private Object getColumnValue(ResultSet rs, int columnIndex, Column column) throws SQLException {
        return rs.getObject(columnIndex);
    }

    private Statement readTableStatement() throws SQLException {
        int rowsFetchSize = 2000;
        Statement statement = this.jdbcConnection.connection().createStatement();
        statement.setFetchSize(rowsFetchSize);
        return statement;
    }

    protected abstract void complete();

    private void rollbackTransaction(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected Clock getClock() {
        return this.clock;
    }

    public static class SnapshottingTask {
        private final boolean snapshotSchema;
        private final boolean snapshotData;

        public SnapshottingTask(boolean snapshotSchema, boolean snapshotData) {
            this.snapshotSchema = snapshotSchema;
            this.snapshotData = snapshotData;
        }

        public boolean snapshotData() {
            return this.snapshotData;
        }

        public boolean snapshotSchema() {
            return this.snapshotSchema;
        }

        public String toString() {
            return "SnapshottingTask [snapshotSchema=" + this.snapshotSchema + ", snapshotData=" + this.snapshotData + "]";
        }
    }

    public static class SnapshotContext
    implements AutoCloseable {
        public final String catalogName;
        public final Tables tables;
        public Set<TableId> capturedTables;
        public OffsetContext offset;

        public SnapshotContext(String catalogName) throws SQLException {
            this.catalogName = catalogName;
            this.tables = new Tables();
        }

        @Override
        public void close() throws Exception {
        }
    }
}

