/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenConnectorUnderTest;
import io.debezium.junit.SkipWhenConnectorsUnderTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;

public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector>
extends AbstractSnapshotTest<T> {
    protected static KafkaCluster kafka;

    protected String getSignalTypeFieldName() {
        return "type";
    }

    protected void sendAdHocSnapshotStopSignal(String ... dataCollectionIds) throws SQLException {
        Object collections = "";
        if (dataCollectionIds.length > 0) {
            String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\"" + x + "\"").collect(Collectors.joining(", "));
            collections = ",\"data-collections\": [" + dataCollectionIdsList + "]";
        }
        try (JdbcConnection connection = this.databaseConnection();){
            String query = String.format("INSERT INTO %s VALUES('ad-hoc', 'stop-snapshot', '{\"type\": \"INCREMENTAL\"" + (String)collections + "}')", this.signalTableName());
            this.logger.info("Sending signal with query {}", (Object)query);
            connection.execute(new String[]{query});
        }
        catch (Exception e) {
            this.logger.warn("Failed to send signal", (Throwable)e);
        }
    }

    protected void sendAdHocSnapshotSignal() throws SQLException {
        this.sendAdHocSnapshotSignal(this.tableDataCollectionId());
    }

    protected void sendAdHocKafkaSnapshotSignal() throws ExecutionException, InterruptedException {
        this.sendExecuteSnapshotKafkaSignal(this.tableDataCollectionId());
    }

    protected void sendExecuteSnapshotKafkaSignal(String fullTableNames) throws ExecutionException, InterruptedException {
        String signalValue = String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", fullTableNames);
        this.sendKafkaSignal(signalValue);
    }

    protected String getSignalsTopic() {
        return "signals_topic";
    }

    protected void sendKafkaSignal(String signalValue) throws ExecutionException, InterruptedException {
        ProducerRecord executeSnapshotSignal = new ProducerRecord(this.getSignalsTopic(), Integer.valueOf(0), (Object)"test_server", (Object)signalValue);
        Configuration signalProducerConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(signalProducerConfig.asProperties());){
            producer.send(executeSnapshotSignal).get();
        }
    }

    protected void sendPauseSignal() {
        try (JdbcConnection connection = this.databaseConnection();){
            String query = String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", this.signalTableName());
            this.logger.info("Sending pause signal with query {}", (Object)query);
            connection.execute(new String[]{query});
        }
        catch (Exception e) {
            this.logger.warn("Failed to send pause signal", (Throwable)e);
        }
    }

    protected void sendResumeSignal() {
        try (JdbcConnection connection = this.databaseConnection();){
            String query = String.format("INSERT INTO %s VALUES('test-resume', 'resume-snapshot', '')", this.signalTableName());
            this.logger.info("Sending resume signal with query {}", (Object)query);
            connection.execute(new String[]{query});
        }
        catch (Exception e) {
            this.logger.warn("Failed to send resume signal", (Throwable)e);
        }
    }

    @Test
    public void snapshotOnly() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void invalidTablesInTheList() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal("invalid1", this.tableDataCollectionId(), "invalid2");
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void inserts() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i + 1000 + 1, i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void insertsWithKafkaSnapshotSignal() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i + 1000 + 1, i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void updates() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int batchSize = 10;
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE %s > %s AND %s <= %s", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i * 10, connection.quotedColumnIdString(this.pkFieldName()), (i + 1) * 10)});
                connection.commit();
            }
        }
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void updatesWithRestart() throws Exception {
        this.populateTable();
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int batchSize = 10;
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE %s > %s AND %s <= %s", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i * 10, connection.quotedColumnIdString(this.pkFieldName()), (i + 1) * 10)});
                connection.commit();
            }
        }
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(this.connectorClass(), config);
                this.waitForConnectorToStart();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void updatesLargeChunk() throws Exception {
        this.populateTable();
        this.startConnector((Configuration.Builder x) -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1000));
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.execute(new String[]{String.format("UPDATE %s SET aa = aa + 2000", this.tableName())});
        }
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void snapshotOnlyWithRestart() throws Exception {
        this.populateTable();
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(this.connectorClass(), config);
                this.waitForConnectorToStart();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @SkipWhenConnectorsUnderTest(value={@SkipWhenConnectorUnderTest(check=EqualityCheck.EQUAL, value=SkipWhenConnectorUnderTest.Connector.SQL_SERVER), @SkipWhenConnectorUnderTest(check=EqualityCheck.EQUAL, value=SkipWhenConnectorUnderTest.Connector.DB2)})
    @FixFor(value={"DBZ-4272"})
    public void snapshotPreceededBySchemaChange() throws Exception {
        this.populateTable();
        this.startConnector();
        this.waitForConnectorToStart();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.execute(new String[]{this.alterTableAddColumnStatement(this.tableName())});
        }
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
        try (JdbcConnection connection = this.databaseConnection();){
            connection.execute(new String[]{this.alterTableDropColumnStatement(this.tableName())});
        }
        this.sendAdHocSnapshotSignal();
        dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void snapshotWithRegexDataCollections() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal(".*");
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-6945"})
    public void snapshotWithDuplicateDataCollections() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal(this.tableDataCollectionId(), this.tableDataCollectionId());
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(1, 1);
        Assert.assertTrue((boolean)Objects.isNull(sourceRecords.recordsForTopic(this.topicName())));
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        this.populateTable();
        this.startConnector((Configuration.Builder x) -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.sendAdHocSnapshotSignalAndWait(new String[0]);
        this.sendAdHocSnapshotStopSignalAndWait(new String[0]);
        Assertions.assertThat((boolean)this.consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(interceptor, "Stopping incremental snapshot")).isTrue();
        this.stopConnector(r -> interceptor.clear());
        this.startConnector();
        Assertions.assertThat((boolean)interceptor.containsMessage("No incremental snapshot in progress")).isTrue();
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i + 1000 + 1, i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void stopCurrentIncrementalSnapshotWithAllCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        this.populateTable();
        this.startConnector((Configuration.Builder x) -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.sendAdHocSnapshotSignalAndWait(new String[0]);
        this.sendAdHocSnapshotStopSignalAndWait(this.tableDataCollectionId());
        Assertions.assertThat((boolean)this.consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(interceptor, "Removing '[" + this.tableDataCollectionId() + "]' collections from incremental snapshot")).isTrue();
        this.stopConnector(r -> interceptor.clear());
        this.startConnector();
        Assertions.assertThat((boolean)interceptor.containsMessage("No incremental snapshot in progress")).isTrue();
        this.sendAdHocSnapshotSignal();
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i + 1000 + 1, i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void removeNotYetCapturedCollectionFromInProgressIncrementalSnapshot() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        this.populateTables();
        this.startConnector((Configuration.Builder x) -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250));
        List<String> collectionIds = this.tableDataCollectionIds();
        Assertions.assertThat(collectionIds).hasSize(2);
        List<String> tableNames = this.tableNames();
        Assertions.assertThat(tableNames).hasSize(2);
        List<String> topicNames = this.topicNames();
        Assertions.assertThat(topicNames).hasSize(2);
        String collectionIdToRemove = collectionIds.get(1);
        String tableToSnapshot = tableNames.get(0);
        String topicToConsume = topicNames.get(0);
        this.sendAdHocSnapshotSignal(collectionIds.toArray(new String[0]));
        this.sendAdHocSnapshotStopSignal(collectionIdToRemove);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage("Removing '[" + collectionIdToRemove + "]' collections from incremental snapshot"));
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableToSnapshot, connection.quotedColumnIdString(this.pkFieldName()), i + 1000 + 1, i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000, topicToConsume);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void removeStartedCapturedCollectionFromInProgressIncrementalSnapshot() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        this.populateTables();
        this.startConnector((Configuration.Builder x) -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250));
        List<String> collectionIds = this.tableDataCollectionIds();
        Assertions.assertThat(collectionIds).hasSize(2);
        List<String> tableNames = this.tableNames();
        Assertions.assertThat(tableNames).hasSize(2);
        List<String> topicNames = this.topicNames();
        Assertions.assertThat(topicNames).hasSize(2);
        String collectionIdToRemove = collectionIds.get(0);
        String tableToSnapshot = tableNames.get(1);
        String topicToConsume = topicNames.get(1);
        this.sendAdHocSnapshotSignal(collectionIds.toArray(new String[0]));
        this.sendAdHocSnapshotStopSignal(collectionIdToRemove);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage("Removing '[" + collectionIdToRemove + "]' collections from incremental snapshot"));
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableToSnapshot, connection.quotedColumnIdString(this.pkFieldName()), i + 1000 + 1, i + 1000)});
            }
            connection.commit();
        }
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000, topicToConsume);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4834"})
    public void shouldSnapshotNewlyAddedTableToIncludeListAfterRestart() throws Exception {
        this.populateTables();
        this.startConnectorWithSnapshot(x -> this.mutableConfig(true, false));
        this.waitForConnectorToStart();
        AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1000);
        this.stopConnector();
        this.startConnector((Configuration.Builder x) -> this.mutableConfig(false, false));
        this.waitForConnectorToStart();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
        this.stopConnector();
    }

    @Test
    public void testPauseDuringSnapshot() throws Exception {
        this.populateTable();
        this.startConnector((Configuration.Builder x) -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 50));
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        ArrayList records = new ArrayList();
        String topicName = this.topicName();
        this.consumeRecords(100, record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add(record);
            }
        });
        this.sendPauseSignal();
        this.consumeAvailableRecords(record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add(record);
            }
        });
        int beforeResume = records.size();
        this.sendResumeSignal();
        int expectedRecordCount = 1000;
        if (1000 - beforeResume > 0) {
            Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000 - beforeResume);
            for (int i = beforeResume + 1; i < 1000; ++i) {
                Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
            }
        }
    }

    @Test
    public void snapshotWithAdditionalCondition() throws Exception {
        int expectedCount = 10;
        int expectedValue = 12345678;
        this.populateTable();
        this.populateTableWithSpecificValue(2000, expectedCount, expectedValue);
        this.waitForCdcTransactionPropagation(3);
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "", this.tableDataCollectionId());
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(expectedCount, x -> true, null);
        Assert.assertEquals((long)expectedCount, (long)dbChanges.size());
        Assert.assertTrue((boolean)dbChanges.values().stream().allMatch(v -> ((Struct)v.value()).getStruct("after").getInt32(this.valueFieldName()).equals(expectedValue)));
    }

    @Test
    public void snapshotWithNewAdditionalConditionsField() throws Exception {
        int expectedCount = 10;
        int expectedValue = 12345678;
        this.populateTable();
        this.populateTableWithSpecificValue(2000, expectedCount, expectedValue);
        this.waitForCdcTransactionPropagation(3);
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(this.tableDataCollectionId(), String.format("aa = %s", expectedValue)), "", this.tableDataCollectionId());
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(expectedCount, x -> true, null);
        Assert.assertEquals((long)expectedCount, (long)dbChanges.size());
        Assert.assertTrue((boolean)dbChanges.values().stream().allMatch(v -> ((Struct)v.value()).getStruct("after").getInt32(this.valueFieldName()).equals(expectedValue)));
    }

    @Test
    public void shouldExecuteRegularSnapshotWhenAdditionalConditionEmpty() throws Exception {
        this.populateTable();
        this.startConnector();
        int recordsCount = 1000;
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("\"\"", "", this.tableDataCollectionId());
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(1000, x -> true, null);
        Assert.assertEquals((long)1000L, (long)dbChanges.size());
    }

    @Test
    public void snapshotWithAdditionalConditionWithRestart() throws Exception {
        int expectedCount = 1000;
        int expectedValue = 12345678;
        this.populateTable();
        this.populateTableWithSpecificValue(2000, expectedCount, expectedValue);
        this.waitForCdcTransactionPropagation(3);
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "", this.tableDataCollectionId());
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(expectedCount, x -> true, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(this.connectorClass(), config);
                this.waitForConnectorToStart();
                restarted.set(true);
            }
        });
        Assert.assertEquals((long)expectedCount, (long)dbChanges.size());
        Assert.assertTrue((boolean)dbChanges.values().stream().allMatch(v -> v.equals(expectedValue)));
    }

    @Test
    public void snapshotWithSurrogateKey() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "\"aa\"", this.tableDataCollectionId());
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
        int expectedCount = 10;
        int expectedValue = 12345678;
        this.populateTable();
        this.populateTableWithSpecificValue(2000, expectedCount, expectedValue);
        this.waitForCdcTransactionPropagation(3);
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "\"aa\"", this.tableDataCollectionId());
        Map<Integer, SourceRecord> dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(expectedCount, x -> true, null);
        Assert.assertEquals((long)expectedCount, (long)dbChanges.size());
        Assert.assertTrue((boolean)dbChanges.values().stream().allMatch(v -> ((Struct)v.value()).getStruct("after").getInt32(this.valueFieldName()).equals(expectedValue)));
    }

    @Test
    public void testNotification() throws Exception {
        this.populateTable();
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink")).with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, this.defaultIncrementalSnapshotChunkSize())).with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification"), this.loggingCompletion(), false);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        AbstractIncrementalSnapshotTest.waitForStreamingRunning(this.connector(), this.server(), AbstractIncrementalSnapshotTest.getStreamingNamespace(), this.task());
        this.sendAdHocSnapshotSignal();
        ArrayList<SourceRecord> records = new ArrayList<SourceRecord>();
        String topicName = this.topicName();
        ArrayList<SourceRecord> notifications = new ArrayList<SourceRecord>();
        this.consumeRecords(100, record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add((SourceRecord)record);
            }
            if ("io.debezium.notification".equals(record.topic())) {
                notifications.add((SourceRecord)record);
            }
        });
        this.sendPauseSignal();
        this.consumeAvailableRecords(record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add((SourceRecord)record);
            }
            if ("io.debezium.notification".equals(record.topic())) {
                notifications.add((SourceRecord)record);
            }
        });
        this.sendResumeSignal();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicUntil(AbstractIncrementalSnapshotTest.incrementalSnapshotCompleted());
        records.addAll(sourceRecords.recordsForTopic(this.topicName()));
        notifications.addAll(sourceRecords.recordsForTopic("io.debezium.notification"));
        List values = records.stream().map(r -> (Struct)r.value()).map(this.getRecordValue()).collect(Collectors.toList());
        for (int i = 0; i < 999; ++i) {
            Assertions.assertThat((Integer)((Integer)values.get(i))).isEqualTo(i);
        }
        this.assertCorrectIncrementalSnapshotNotification(notifications);
    }

    protected int defaultIncrementalSnapshotChunkSize() {
        return 1;
    }

    private static BiPredicate<Integer, SourceRecord> incrementalSnapshotCompleted() {
        return (recordsConsumed, record) -> record.topic().equals("io.debezium.notification") && ((Struct)record.value()).getString("aggregate_type").equals("Incremental Snapshot") && ((Struct)record.value()).getString("type").equals("COMPLETED");
    }

    private void assertCorrectIncrementalSnapshotNotification(List<SourceRecord> notifications) {
        List incrementalSnapshotNotification = notifications.stream().map(s -> (Struct)s.value()).filter(s -> s.getString("aggregate_type").equals("Incremental Snapshot")).collect(Collectors.toList());
        Assertions.assertThat((boolean)incrementalSnapshotNotification.stream().anyMatch(s -> s.getString("type").equals("STARTED"))).isTrue();
        Assertions.assertThat((boolean)incrementalSnapshotNotification.stream().anyMatch(s -> s.getString("type").equals("PAUSED"))).isTrue();
        Assertions.assertThat((boolean)incrementalSnapshotNotification.stream().anyMatch(s -> s.getString("type").equals("RESUMED"))).isTrue();
        Assertions.assertThat((boolean)incrementalSnapshotNotification.stream().anyMatch(s -> s.getString("type").equals("IN_PROGRESS"))).isTrue();
        Assertions.assertThat((boolean)incrementalSnapshotNotification.stream().anyMatch(s -> s.getString("type").equals("TABLE_SCAN_COMPLETED"))).isTrue();
        Assertions.assertThat((boolean)incrementalSnapshotNotification.stream().anyMatch(s -> s.getString("type").equals("COMPLETED"))).isTrue();
        Assertions.assertThat(incrementalSnapshotNotification.stream().map(s -> s.getString("id")).distinct().collect(Collectors.toList())).contains((Object[])new String[]{"ad-hoc"});
        Struct inProgress = incrementalSnapshotNotification.stream().filter(s -> s.getString("type").equals("IN_PROGRESS")).findFirst().get();
        ((MapAssert)((MapAssert)Assertions.assertThat((Map)inProgress.getMap("additional_data")).containsEntry((Object)"current_collection_in_progress", (Object)this.tableDataCollectionId())).containsEntry((Object)"maximum_key", (Object)"1000")).containsEntry((Object)"last_processed_key", (Object)String.valueOf(this.defaultIncrementalSnapshotChunkSize()));
        Struct completed = incrementalSnapshotNotification.stream().filter(s -> s.getString("type").equals("TABLE_SCAN_COMPLETED")).findFirst().get();
        Assertions.assertThat((Map)completed.getMap("additional_data")).containsEntry((Object)"total_rows_scanned", (Object)"1000");
    }

    protected void sendAdHocSnapshotSignalAndWait(String ... collectionIds) throws Exception {
        if (collectionIds.length == 0) {
            this.sendAdHocSnapshotSignal();
        } else {
            this.sendAdHocSnapshotSignal(collectionIds);
        }
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            AtomicBoolean result = new AtomicBoolean(false);
            this.consumeAvailableRecords(r -> {
                if (r.topic().endsWith(this.signalTableNameSanitized())) {
                    result.set(true);
                }
            });
            return result.get();
        });
    }

    protected void sendAdHocSnapshotStopSignalAndWait(String ... collectionIds) throws Exception {
        this.sendAdHocSnapshotStopSignal(collectionIds);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            AtomicBoolean stopSignal = new AtomicBoolean(false);
            this.consumeAvailableRecords(r -> {
                String type;
                Struct after;
                if (r.topic().endsWith(this.signalTableNameSanitized()) && (after = ((Struct)r.value()).getStruct("after")) != null && "stop-snapshot".equals(type = after.getString(this.getSignalTypeFieldName()))) {
                    stopSignal.set(true);
                }
            });
            return stopSignal.get();
        });
    }

    protected boolean consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(LogInterceptor interceptor, String stopMessage) throws Exception {
        AtomicBoolean stopMessageFound = new AtomicBoolean(false);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            if (interceptor.containsMessage(stopMessage)) {
                stopMessageFound.set(true);
            }
            return this.consumeAvailableRecords(r -> {}) == 0;
        });
        return stopMessageFound.get();
    }
}

