/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

public abstract class AbstractIncrementalSnapshotWithSchemaChangesSupportTest<T extends SourceConnector>
extends AbstractIncrementalSnapshotTest<T> {
    protected abstract String tableName(String var1);

    protected abstract String alterColumnStatement(String var1, String var2, String var3);

    protected abstract String alterColumnSetNotNullStatement(String var1, String var2, String var3);

    protected abstract String alterColumnDropNotNullStatement(String var1, String var2, String var3);

    protected abstract String alterColumnSetDefaultStatement(String var1, String var2, String var3, String var4);

    protected abstract String alterColumnDropDefaultStatement(String var1, String var2, String var3);

    protected abstract void executeRenameTable(JdbcConnection var1, String var2) throws SQLException;

    protected abstract String createTableStatement(String var1, String var2);

    @Test
    public void schemaChanges() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(true);
            connection.execute(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", this.tableName(), 1011, 1010)});
            connection.execute(new String[]{this.alterColumnStatement(this.tableName(), "aa", "VARCHAR(5)")});
            connection.execute(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, '%s')", this.tableName(), 1010, 1009)});
            for (int i = 0; i < 9 && !Thread.interrupted(); i += 3) {
                connection.execute(new String[]{String.format("ALTER TABLE %s ADD c INT", this.tableName())});
                connection.execute(new String[]{String.format("INSERT INTO %s (pk, aa, c) VALUES (%s, '%s', %s)", this.tableName(), i + 1000 + 1, i + 1000, 1)});
                connection.execute(new String[]{this.alterColumnStatement(this.tableName(), "c", "VARCHAR(5)")});
                connection.execute(new String[]{String.format("INSERT INTO %s (pk, aa, c) VALUES (%s, '%s', '%s')", this.tableName(), i + 1000 + 2, i + 1000 + 1, "1")});
                connection.execute(new String[]{String.format("ALTER TABLE %s DROP COLUMN c", this.tableName())});
                connection.execute(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, '%s')", this.tableName(), i + 1000 + 3, i + 1000 + 2)});
            }
        }
        int expectedRecordCount = 1011;
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(1011);
        for (int i = 0; i < 1011; ++i) {
            Assert.assertTrue((String)String.format("missing PK %d", i + 1), (boolean)dbChanges.containsKey(i + 1));
            SourceRecord record = dbChanges.get(i + 1);
            Schema.Type valueType = record.valueSchema().field("after").schema().field(this.valueFieldName()).schema().type();
            if (valueType == Schema.Type.INT32) {
                int value = ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName());
                Assert.assertEquals((long)i, (long)value);
                continue;
            }
            String value = ((Struct)record.value()).getStruct("after").getString(this.valueFieldName());
            Assert.assertEquals((Object)Integer.toString(i), (Object)value);
        }
    }

    @Test
    public void renameTable() throws Exception {
        String c;
        int value;
        SourceRecord record;
        int i;
        this.populateTable();
        String newTable = "new_table";
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(true);
            connection.execute(new String[]{this.createTableStatement(this.tableName("new_table"), this.tableName())});
            connection.execute(new String[]{String.format("INSERT INTO %s SELECT * FROM %s", this.tableName("new_table"), this.tableName())});
            connection.execute(new String[]{String.format("ALTER TABLE %s ADD c varchar(5)", this.tableName("new_table"))});
        }
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int updatedRowCount = 10;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean tableRenamed = new AtomicBoolean();
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(1000, x -> true, x -> {
            if (recordCounter.addAndGet(x.size()) > 10 && !tableRenamed.get()) {
                try (JdbcConnection connection = this.databaseConnection();){
                    this.executeRenameTable(connection, "new_table");
                    connection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET c = 'c' WHERE pk >= %s", this.tableName(), 990)});
                    connection.commit();
                }
                catch (SQLException e) {
                    throw new RuntimeException(e);
                }
                tableRenamed.set(true);
            }
        });
        for (i = 0; i < 990; ++i) {
            Assert.assertTrue((boolean)dbChanges.containsKey(i + 1));
            record = dbChanges.get(i + 1);
            value = ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName());
            Assert.assertEquals((long)i, (long)value);
            if (((Struct)record.value()).schema().field("c") == null) continue;
            c = ((Struct)record.value()).getStruct("after").getString("c");
            Assert.assertNull((Object)c);
        }
        for (i = 990; i < 1000; ++i) {
            Assert.assertTrue((boolean)dbChanges.containsKey(i + 1));
            record = dbChanges.get(i + 1);
            value = ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName());
            Assert.assertEquals((long)i, (long)value);
            c = ((Struct)record.value()).getStruct("after").getString("c");
            Assert.assertEquals((Object)"c", (Object)c);
        }
    }

    @Test
    public void columnNullabilityChanges() throws Exception {
        SourceRecord record;
        int i;
        this.populateTable();
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            connection.execute(new String[]{this.alterColumnSetNotNullStatement(this.tableName(), "aa", "INTEGER")});
            connection.commit();
            connection.execute(new String[]{this.alterColumnDropNotNullStatement(this.tableName(), "aa", "INTEGER")});
            connection.commit();
            for (int i2 = 0; i2 < 1000; ++i2) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, null)", this.tableName(), i2 + 1000 + 1)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(2000);
        for (i = 0; i < 1000; ++i) {
            record = dbChanges.get(i + 1);
            int value = ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName());
            Assert.assertEquals((long)i, (long)value);
        }
        for (i = 1000; i < 2000; ++i) {
            record = dbChanges.get(i + 1);
            Integer value = ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName());
            Assert.assertNull((Object)value);
        }
    }

    @Test
    public void columnDefaultChanges() throws Exception {
        Struct after;
        SourceRecord record;
        int i;
        this.populateTable();
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 4000;
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            connection.execute(new String[]{this.alterColumnSetDefaultStatement(this.tableName(), "aa", "INTEGER", "-6")});
            connection.commit();
            for (i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk) VALUES (%s)", this.tableName(), i + 1000 + 1)});
            }
            connection.commit();
            connection.executeWithoutCommitting(new String[]{this.alterColumnDropDefaultStatement(this.tableName(), "aa", "INTEGER")});
            connection.executeWithoutCommitting(new String[]{this.alterColumnSetDefaultStatement(this.tableName(), "aa", "INTEGER", "-9")});
            connection.commit();
            for (i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk) VALUES (%s)", this.tableName(), i + 2000 + 1)});
            }
            connection.commit();
            connection.executeWithoutCommitting(new String[]{this.alterColumnDropDefaultStatement(this.tableName(), "aa", "INTEGER")});
            connection.commit();
            for (i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk) VALUES (%s)", this.tableName(), i + 3000 + 1)});
            }
            connection.commit();
        }
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(4000);
        for (i = 0; i < 1000; ++i) {
            record = dbChanges.get(i + 1);
            after = ((Struct)record.value()).getStruct("after");
            int value = after.getInt32(this.valueFieldName());
            Assert.assertEquals((long)i, (long)value);
        }
        for (i = 1000; i < 2000; ++i) {
            record = dbChanges.get(i + 1);
            after = ((Struct)record.value()).getStruct("after");
            Integer value = after.getInt32(this.valueFieldName());
            Assert.assertNotNull((String)("value is null at pk=" + (i + 1)), (Object)value);
            Assert.assertEquals((String)String.format("value is %d at pk = %d, expected -6", value, i + 1), (float)-6.0f, (float)value.intValue(), (float)0.0f);
        }
        for (i = 2000; i < 3000; ++i) {
            record = dbChanges.get(i + 1);
            after = ((Struct)record.value()).getStruct("after");
            Integer value = after.getInt32(this.valueFieldName());
            Assert.assertNotNull((String)("value is null at pk=" + (i + 1)), (Object)value);
            Assert.assertEquals((String)String.format("value is %d at pk = %d, expected -9", value, i + 1), (float)-9.0f, (float)value.intValue(), (float)0.0f);
        }
        for (i = 3000; i < 4000; ++i) {
            record = dbChanges.get(i + 1);
            after = ((Struct)record.value()).getStruct("after");
            Integer value = after.getInt32(this.valueFieldName());
            Assert.assertNull((String)("value is not null at pk=" + (i + 1)), (Object)value);
        }
    }
}

