/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenConnectorUnderTest;
import io.debezium.junit.SkipWhenConnectorsUnderTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularDataSupport;
import org.apache.kafka.connect.data.Struct;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Test;

public abstract class AbstractBlockingSnapshotTest
extends AbstractSnapshotTest {
    private int signalingRecords;
    protected static final int ROW_COUNT = 1000;

    @Override
    protected abstract Configuration.Builder mutableConfig(boolean var1, boolean var2);

    @Override
    protected abstract JdbcConnection databaseConnection();

    @Override
    protected abstract String topicName();

    @Override
    protected abstract String tableName();

    @Override
    protected abstract String connector();

    @Override
    protected abstract String server();

    protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
        return this.mutableConfig(signalTableOnly, storeOnlyCapturedDdl);
    }

    @Test
    public void executeBlockingSnapshot() throws Exception {
        this.populateTable();
        this.startConnectorWithSnapshot(x -> this.mutableConfig(false, false));
        AbstractBlockingSnapshotTest.waitForSnapshotToBeCompleted(this.connector(), this.server(), this.task(), this.database());
        this.insertRecords(1000, 1000);
        AbstractConnectorTest.SourceRecords consumedRecordsByTopic = this.consumeRecordsByTopic(2000, 10);
        this.assertRecordsFromSnapshotAndStreamingArePresent(2000, consumedRecordsByTopic);
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, this.tableDataCollectionId());
        AbstractBlockingSnapshotTest.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        this.assertRecordsFromSnapshotAndStreamingArePresent(2000, this.consumeRecordsByTopic(2000 + this.signalingRecords, 10));
        this.insertRecords(1000, 2000);
        this.assertStreamingRecordsArePresent(1000, this.consumeRecordsByTopic(1000, 10));
    }

    @Test
    public void executeBlockingSnapshotWhileStreaming() throws Exception {
        this.populateTable();
        this.startConnectorWithSnapshot(x -> this.mutableConfig(false, false));
        AbstractBlockingSnapshotTest.waitForSnapshotToBeCompleted(this.connector(), this.server(), this.task(), this.database());
        Future<?> batchInserts = this.executeAsync(this.insertTask());
        Thread.sleep(2000L);
        this.sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, this.tableDataCollectionId());
        AbstractBlockingSnapshotTest.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        AbstractBlockingSnapshotTest.waitForStreamingRunning(this.connector(), this.server(), AbstractBlockingSnapshotTest.getStreamingNamespace(), this.task());
        Long totalSnapshotRecords = this.getTotalSnapshotRecords(this.tableDataCollectionId(), this.connector(), this.server(), this.task(), this.database());
        batchInserts.get(120L, TimeUnit.SECONDS);
        this.insertRecords(1000, 2000);
        this.signalingRecords = 1;
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = this.consumeRecordsByTopic((int)(3000L + totalSnapshotRecords + (long)this.signalingRecords), 10);
        this.assertRecordsWithValuesPresent((int)(3000L + totalSnapshotRecords), AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords), this.topicName(), consumeRecordsByTopic);
    }

    @Test
    public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
        this.populateTable(this.tableNames().get(1).toString());
        this.startConnectorWithSnapshot(x -> this.mutableConfig(false, false));
        AbstractBlockingSnapshotTest.waitForStreamingRunning(this.connector(), this.server(), AbstractBlockingSnapshotTest.getStreamingNamespace(), this.task());
        this.sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(this.tableDataCollectionIds().get(1), String.format("SELECT * FROM %s WHERE aa < 500", this.tableNames().get(1))), "", AbstractSnapshotSignal.SnapshotType.BLOCKING, this.tableDataCollectionIds().get(1).toString());
        AbstractBlockingSnapshotTest.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        AbstractConnectorTest.SourceRecords consumedRecordsByTopic = this.consumeRecordsByTopic(500 + this.signalingRecords, 10);
        this.assertRecordsWithValuesPresent(500, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), this.topicNames().get(1).toString(), consumedRecordsByTopic);
    }

    @Test
    @SkipWhenConnectorsUnderTest(value={@SkipWhenConnectorUnderTest(check=EqualityCheck.EQUAL, value=SkipWhenConnectorUnderTest.Connector.POSTGRES), @SkipWhenConnectorUnderTest(check=EqualityCheck.EQUAL, value=SkipWhenConnectorUnderTest.Connector.SQL_SERVER), @SkipWhenConnectorUnderTest(check=EqualityCheck.EQUAL, value=SkipWhenConnectorUnderTest.Connector.DB2)})
    public void readsSchemaOnlyForSignaledTables() throws Exception {
        this.populateTable(this.tableNames().get(1).toString());
        this.startConnectorWithSnapshot(x -> this.historizedMutableConfig(false, false));
        AbstractBlockingSnapshotTest.waitForStreamingRunning(this.connector(), this.server(), AbstractBlockingSnapshotTest.getStreamingNamespace(), this.task());
        this.sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(this.tableDataCollectionIds().get(1), String.format("SELECT * FROM %s WHERE aa < 500", this.tableNames().get(1))), "", AbstractSnapshotSignal.SnapshotType.BLOCKING, this.tableDataCollectionIds().get(1).toString());
        AbstractBlockingSnapshotTest.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        AbstractConnectorTest.SourceRecords recordsByTopic = this.consumeRecordsByTopic(500 + this.signalingRecords + this.expectedDdlsCount(), 1);
        this.assertRecordsWithValuesPresent(500, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), this.topicNames().get(1).toString(), recordsByTopic);
        List<String> ddls = recordsByTopic.recordsForTopic(this.server()).stream().map(sourceRecord -> ((Struct)sourceRecord.value()).getString("ddl")).collect(Collectors.toList());
        this.assertDdl(ddls);
    }

    protected int expectedDdlsCount() {
        return 0;
    }

    protected void assertDdl(List<String> schemaChangesDdls) {
    }

    protected int insertMaxSleep() {
        return 2;
    }

    private Runnable insertTask() {
        return () -> {
            try {
                this.insertRecordsWithRandomSleep(1000, 1000, this.insertMaxSleep());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private Long getTotalSnapshotRecords(String table, String connector, String server, String task, String database) throws MalformedObjectNameException, ReflectionException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        TabularDataSupport rowsScanned = (TabularDataSupport)mbeanServer.getAttribute(AbstractBlockingSnapshotTest.getSnapshotMetricsObjectName(connector, server, task, database), "RowsScanned");
        Map<String, Object> scannedRowsByTable = rowsScanned.values().stream().map(c -> (CompositeDataSupport)c).collect(Collectors.toMap(compositeDataSupport -> compositeDataSupport.get("key").toString(), compositeDataSupport -> compositeDataSupport.get("value")));
        String unquotedTableName = table.replace("`", "");
        return (Long)scannedRowsByTable.get(unquotedTableName);
    }

    private static List<Integer> getExpectedValues(Long totalSnapshotRecords) {
        List initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
        List firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList());
        List blockingSnapshotValues = Stream.of(initialSnapShotValues, IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
        List secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList());
        return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(Collection::stream).collect(Collectors.toList());
    }

    protected static void waitForLogMessage(String message, Class<?> logEmitterClass) {
        LogInterceptor interceptor = new LogInterceptor(logEmitterClass);
        Awaitility.await().alias("Snapshot not completed on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost((long)AbstractBlockingSnapshotTest.waitTimeForRecords() * 30L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage(message));
    }

    private Future<?> executeAsync(Runnable operation) {
        return Executors.newSingleThreadExecutor().submit(operation);
    }

    protected void assertStreamingRecordsArePresent(int expectedRecords, AbstractConnectorTest.SourceRecords recordsByTopic) {
        this.assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()), this.topicName(), recordsByTopic);
    }

    protected void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords, AbstractConnectorTest.SourceRecords recordsByTopic) throws InterruptedException {
        this.assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList()), this.topicName(), recordsByTopic);
    }

    private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues, String topicName, AbstractConnectorTest.SourceRecords recordsByTopic) {
        List actual = recordsByTopic.recordsForTopic(topicName).stream().map(s -> ((Struct)s.value()).getStruct("after").getInt32(this.valueFieldName())).collect(Collectors.toList());
        Assertions.assertThat((int)recordsByTopic.recordsForTopic(topicName).size()).isEqualTo(expectedRecords);
        Assertions.assertThat(actual).containsAll(expectedValues);
    }

    protected void insertRecords(int rowCount, int startingPkId) throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < rowCount; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i + startingPkId + 1, i + startingPkId)});
            }
            connection.commit();
        }
    }

    private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep, Runnable actionOnInsert) throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(true);
            for (int i = 0; i < rowCount; ++i) {
                connection.execute(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", this.tableName(), connection.quotedColumnIdString(this.pkFieldName()), i + startingPkId + 1, i + startingPkId)});
                actionOnInsert.run();
                int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep);
                Thread.sleep(sleepTime);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep) throws SQLException {
        this.insertRecordsWithRandomSleep(rowCount, startingPkId, maxSleep, () -> {});
    }
}

