/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.IntAssert;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class IncrementalSnapshotIT
extends AbstractConnectorTest {
    private static final int ROW_COUNT = 1000;
    private static final int MAXIMUM_NO_RECORDS_CONSUMES = 2;
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-is.txt").toAbsolutePath();
    private static final String SERVER_NAME = "is_test";
    private final UniqueDatabase DATABASE = new UniqueDatabase("is_test", "incremental_snapshot_test").withDbHistoryPath(DB_HISTORY_PATH);

    @Before
    public void before() throws SQLException {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void after() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    private void populateTable() throws SQLException {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO a (aa) VALUES (%s)", i)});
            }
            connection.commit();
        }
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, x -> true, null);
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, Integer>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        String topicName = this.DATABASE.topicForTable("a");
        HashMap<Integer, Integer> dbChanges = new HashMap<Integer, Integer>();
        int noRecords = 0;
        while (true) {
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List dataRecords = records.recordsForTopic(topicName);
            if (records.allRecordsInOrder().isEmpty()) {
                ((IntAssert)Assertions.assertThat((int)(++noRecords)).describedAs("Too many no data record results")).isLessThanOrEqualTo(2);
                continue;
            }
            noRecords = 0;
            if (dataRecords == null || dataRecords.isEmpty()) continue;
            dataRecords.forEach(record -> {
                int id = ((Struct)record.key()).getInt32("pk");
                int value = ((Struct)record.value()).getStruct("after").getInt32("aa");
                dbChanges.put(id, value);
            });
            if (recordConsumer != null) {
                recordConsumer.accept(dataRecords);
            }
            if (dbChanges.size() >= recordCount && !dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) break;
        }
        Assertions.assertThat(dbChanges).hasSize(recordCount);
        return dbChanges;
    }

    @Test
    public void snapshotOnly() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        Configuration config = this.config();
        this.start(MySqlConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
        }
    }

    protected void sendAdHocSnapshotSignal() throws SQLException {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"INSERT INTO debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"" + this.DATABASE.qualifiedTableName("a") + "\"]}')"});
        }
    }

    protected Configuration config() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.USER, "mysqluser")).with(MySqlConnectorConfig.PASSWORD, "mysqlpw")).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue())).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal"))).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)).with("internal.implementation", "new").build();
        return config;
    }

    @Test
    public void inserts() throws Exception {
        int i;
        Testing.Print.enable();
        this.populateTable();
        Configuration config = this.config();
        this.start(MySqlConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            for (i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO a (aa) VALUES (%s)", i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void updates() throws Exception {
        int i;
        Testing.Print.enable();
        this.populateTable();
        Configuration config = this.config();
        this.start(MySqlConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int batchSize = 10;
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            for (i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("UPDATE a SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", i * 10, (i + 1) * 10)});
                connection.commit();
            }
        }
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, null);
        for (i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void updatesWithRestart() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        Configuration config = this.config();
        this.startAndConsumeTillEnd(MySqlConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int batchSize = 10;
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("UPDATE a SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", i * 10, (i + 1) * 10)});
                connection.commit();
            }
        }
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(MySqlConnector.class, config);
                this.assertConnectorIsRunning();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void updatesLargeChunk() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        Configuration config = this.config();
        this.start(MySqlConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"UPDATE a SET aa = aa + 2000"});
        }
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void snapshotOnlyWithRestart() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        Configuration config = this.config();
        this.startAndConsumeTillEnd(MySqlConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(MySqlConnector.class, config);
                this.assertConnectorIsRunning();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
        }
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 3000;
    }
}

