/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.IncrementalSnapshotIT;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.Flaky;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.signal.channels.FileSignalChannel;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenGtidModeIs(value=SkipWhenGtidModeIs.GtidMode.OFF, reason="Read only connection requires GTID_MODE to be ON")
public class ReadOnlyIncrementalSnapshotIT
extends IncrementalSnapshotIT {
    private static KafkaCluster kafka;
    private static final int PARTITION_NO = 0;
    public static final String EXCLUDED_TABLE = "b";
    @Rule
    public TestRule skipTest = new SkipTestDependingOnGtidModeRule();
    @Rule
    public ConditionalFail conditionalFail = new ConditionalFail();
    private final Path signalsFile = Paths.get("src", "test", "resources").resolve("debezium_signaling_file.txt");

    @Override
    @Before
    public void before() throws SQLException {
        super.before();
        kafka.createTopic(this.getSignalsTopic(), 1, 1);
    }

    @BeforeClass
    public static void startKafka() throws Exception {
        File dataDir = Testing.Files.createTestingDirectory((String)"signal_cluster");
        Testing.Files.delete((File)dataDir);
        kafka = new KafkaCluster().usingDirectory(dataDir).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf((String)"auto.create.topics.enable", (String)"false", (String)"zookeeper.session.timeout.ms", (String)"20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @Override
    protected Configuration.Builder config() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)super.config().with(MySqlConnectorConfig.TABLE_EXCLUDE_LIST, this.DATABASE.getDatabaseName() + ".b")).with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true)).with(KafkaSignalChannel.SIGNAL_TOPIC, this.getSignalsTopic())).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList())).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", this.DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4"));
    }

    protected String getSignalsTopic() {
        return this.DATABASE.getDatabaseName() + "signals_topic";
    }

    protected void sendExecuteSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        this.sendExecuteSnapshotKafkaSignal(this.tableDataCollectionId());
    }

    protected void sendExecuteSnapshotKafkaSignal(String fullTableNames) throws ExecutionException, InterruptedException {
        String signalValue = String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", fullTableNames);
        this.sendKafkaSignal(signalValue);
    }

    protected void sendStopSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        this.sendStopSnapshotKafkaSignal(this.tableDataCollectionId());
    }

    protected void sendStopSnapshotKafkaSignal(String fullTableNames) throws ExecutionException, InterruptedException {
        String signalValue = String.format("{\"type\":\"stop-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", fullTableNames);
        this.sendKafkaSignal(signalValue);
    }

    protected void sendPauseSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        this.sendKafkaSignal("{\"type\":\"pause-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
    }

    protected void sendResumeSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        this.sendKafkaSignal("{\"type\":\"resume-snapshot\",\"data\": {\"type\": \"INCREMENTAL\"}}");
    }

    protected void sendKafkaSignal(String signalValue) throws ExecutionException, InterruptedException {
        ProducerRecord executeSnapshotSignal = new ProducerRecord(this.getSignalsTopic(), Integer.valueOf(0), (Object)"is_test", (Object)signalValue);
        Configuration signalProducerConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(signalProducerConfig.asProperties());){
            producer.send(executeSnapshotSignal).get();
        }
    }

    @Test
    public void emptyHighWatermark() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void filteredEvents() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendExecuteSnapshotKafkaSignal();
        Thread t = new Thread(() -> {
            try (JdbcConnection connection = this.databaseConnection();){
                connection.setAutoCommit(false);
                int i = 0;
                while (!Thread.interrupted()) {
                    connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", EXCLUDED_TABLE, i + 1000 + 1, i + 1000)});
                    connection.commit();
                    ++i;
                }
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        });
        t.setDaemon(true);
        t.setName("filtered-binlog-events-thread");
        try {
            t.start();
            int expectedRecordCount = 1000;
            Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
            for (int i = 0; i < 1000; ++i) {
                Assertions.assertThat((Map)dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
            }
        }
        finally {
            t.interrupt();
        }
    }

    @Test
    public void inserts4Pks() throws Exception {
        this.populate4PkTable();
        this.startConnector();
        this.sendExecuteSnapshotKafkaSignal(this.DATABASE.qualifiedTableName("a4"));
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getInt32("pk1") * 1000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"), record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), this.DATABASE.topicForTable("a4"), null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void inserts4PksWithSignalFile() throws Exception {
        this.populate4PkTable();
        this.startConnector(c -> (Configuration.Builder)((Configuration.Builder)c.with(FileSignalChannel.SIGNAL_FILE, this.signalsFile.toString())).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "file"));
        this.sendExecuteSnapshotFileSignal(this.DATABASE.qualifiedTableName("a4"));
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getInt32("pk1") * 1000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"), record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), this.DATABASE.topicForTable("a4"), null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-7441"})
    public void aSignalAddedToFileWhenConnectorIsStoppedShouldBeProcessedWhenItStarts() throws Exception {
        this.populate4PkTable();
        this.sendExecuteSnapshotFileSignal(this.DATABASE.qualifiedTableName("a4"));
        this.startConnector(c -> (Configuration.Builder)((Configuration.Builder)c.with(FileSignalChannel.SIGNAL_FILE, this.signalsFile.toString())).with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "file"), this.loggingCompletion(), false);
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getInt32("pk1") * 1000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"), record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), this.DATABASE.topicForTable("a4"), null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test(expected=ConnectException.class)
    @SkipWhenGtidModeIs(value=SkipWhenGtidModeIs.GtidMode.ON, reason="Read only connection requires GTID_MODE to be ON")
    public void shouldFailIfGtidModeIsOff() throws Exception {
        this.populateTable();
        AtomicReference exception = new AtomicReference();
        this.startConnector((success, message, error) -> exception.set(error));
        this.waitForEngineShutdown();
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e != null) {
            throw (RuntimeException)e;
        }
    }

    @Test
    @Flaky(value="DBZ-7572")
    @FixFor(value={"DBZ-5453"})
    public void testStopSnapshotKafkaSignal() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
        this.populateTable();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendExecuteSnapshotKafkaSignal();
        this.consumeMixedWithIncrementalSnapshot(1);
        this.sendStopSnapshotKafkaSignal();
        ArrayList records = new ArrayList();
        String topicName = this.topicName();
        String tableRemoveMessage = String.format("Removed '%s' from incremental snapshot collection list.", this.tableDataCollectionId());
        Awaitility.await().atMost(Duration.ofMinutes(2L)).until(() -> {
            this.consumeAvailableRecords(record -> {
                if (topicName.equalsIgnoreCase(record.topic())) {
                    records.add(record);
                }
            });
            return logInterceptor.containsMessage(tableRemoveMessage);
        });
    }

    @Test
    public void testPauseDuringSnapshotKafkaSignal() throws Exception {
        this.populateTable();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendExecuteSnapshotKafkaSignal();
        ArrayList records = new ArrayList();
        String topicName = this.topicName();
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(100);
        this.sendPauseSnapshotKafkaSignal();
        this.consumeAvailableRecords(record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add(record);
            }
        });
        int beforeResume = records.size() + dbChanges.size();
        this.sendResumeSnapshotKafkaSignal();
        dbChanges = this.consumeMixedWithIncrementalSnapshot(1000 - beforeResume);
        for (int i = beforeResume + 1; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void insertInsertWatermarkingStrategy() throws Exception {
    }

    @Test
    public void insertDeleteWatermarkingStrategy() throws Exception {
    }

    private void sendExecuteSnapshotFileSignal(String fullTableNames) throws IOException {
        String signalValue = String.format("{\"id\":\"12345\",\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", fullTableNames);
        Files.write(this.signalsFile, signalValue.getBytes(), new OpenOption[0]);
    }

    protected void populate4PkTable() throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populate4PkTable(connection, "a4");
        }
    }
}

