/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.IncrementalSnapshotIT;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.junit.SkipTestDependingOnGtidModeRule;
import io.debezium.connector.mysql.junit.SkipWhenGtidModeIs;
import io.debezium.connector.mysql.signal.KafkaSignalThread;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.kafka.KafkaCluster;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenGtidModeIs(value=SkipWhenGtidModeIs.GtidMode.OFF, reason="Read only connection requires GTID_MODE to be ON")
public class ReadOnlyIncrementalSnapshotIT
extends IncrementalSnapshotIT {
    private static KafkaCluster kafka;
    private static final int PARTITION_NO = 0;
    public static final String EXCLUDED_TABLE = "b";
    @Rule
    public TestRule skipTest = new SkipTestDependingOnGtidModeRule();

    @Override
    @Before
    public void before() throws SQLException {
        super.before();
        kafka.createTopic(this.getSignalsTopic(), 1, 1);
    }

    @BeforeClass
    public static void startKafka() throws Exception {
        File dataDir = Testing.Files.createTestingDirectory((String)"signal_cluster");
        Testing.Files.delete((File)dataDir);
        kafka = new KafkaCluster().usingDirectory(dataDir).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf((String)"auto.create.topics.enable", (String)"false", (String)"zookeeper.session.timeout.ms", (String)"20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @Override
    protected Configuration.Builder config() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)super.config().with(MySqlConnectorConfig.TABLE_EXCLUDE_LIST, this.DATABASE.getDatabaseName() + "." + EXCLUDED_TABLE)).with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true)).with(KafkaSignalThread.SIGNAL_TOPIC, this.getSignalsTopic())).with(KafkaSignalThread.BOOTSTRAP_SERVERS, kafka.brokerList())).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", this.DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4"));
    }

    private String getSignalsTopic() {
        return this.DATABASE.getDatabaseName() + "signals_topic";
    }

    protected void sendExecuteSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
        this.sendExecuteSnapshotKafkaSignal(this.tableDataCollectionId());
    }

    protected void sendExecuteSnapshotKafkaSignal(String fullTableNames) throws ExecutionException, InterruptedException {
        String signalValue = String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", fullTableNames);
        ProducerRecord executeSnapshotSignal = new ProducerRecord(this.getSignalsTopic(), Integer.valueOf(0), (Object)"is_test", (Object)signalValue);
        Configuration signalProducerConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(signalProducerConfig.asProperties());){
            producer.send(executeSnapshotSignal).get();
        }
    }

    @Test
    public void emptyHighWatermark() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void filteredEvents() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        this.startConnector();
        this.sendExecuteSnapshotKafkaSignal();
        Thread t = new Thread(() -> {
            try (JdbcConnection connection = this.databaseConnection();){
                connection.setAutoCommit(false);
                int i = 0;
                while (!Thread.interrupted()) {
                    connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", EXCLUDED_TABLE, i + 1000 + 1, i + 1000)});
                    connection.commit();
                    ++i;
                }
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        });
        t.setDaemon(true);
        t.setName("filtered-binlog-events-thread");
        try {
            t.start();
            int expectedRecordCount = 1000;
            Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
            for (int i = 0; i < 1000; ++i) {
                Assertions.assertThat((Map)dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
            }
        }
        finally {
            t.interrupt();
        }
    }

    @Test
    public void inserts4Pks() throws Exception {
        Testing.Print.enable();
        this.populate4PkTable();
        this.startConnector();
        this.sendExecuteSnapshotKafkaSignal(this.DATABASE.qualifiedTableName("a4"));
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getInt32("pk1") * 1000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"), record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), this.DATABASE.topicForTable("a4"), null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void insertsWithoutPks() throws Exception {
        Testing.Print.enable();
        this.populate4WithoutPkTable();
        this.startConnector();
        this.sendExecuteSnapshotKafkaSignal(this.DATABASE.qualifiedTableName("a42"));
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getInt32("pk1") * 1000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"), record -> ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName()), this.DATABASE.topicForTable("a42"), null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat((Map)dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test(expected=ConnectException.class)
    @SkipWhenGtidModeIs(value=SkipWhenGtidModeIs.GtidMode.ON, reason="Read only connection requires GTID_MODE to be ON")
    public void shouldFailIfGtidModeIsOff() throws Exception {
        Testing.Print.enable();
        this.populateTable();
        AtomicReference exception = new AtomicReference();
        this.startConnector((success, message, error) -> exception.set(error));
        ReadOnlyIncrementalSnapshotIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e != null) {
            throw (RuntimeException)e;
        }
    }

    protected void populate4PkTable() throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populate4PkTable(connection, "a4");
        }
    }

    protected void populate4WithoutPkTable() throws SQLException {
        try (JdbcConnection connection = this.databaseConnection();){
            this.populate4PkTable(connection, "a42");
        }
    }
}

