/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.connector.spanner.AbstractSpannerConnectorIT;
import io.debezium.connector.spanner.SpannerConnector;
import io.debezium.embedded.AbstractConnectorTest;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class BasicSanityCheckIT
extends AbstractSpannerConnectorIT {
    private static final String tableName = "embedded_sanity_tests_table";
    private static final String changeStreamName = "embeddedSanityTestChangeStream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        databaseConnection.createTable("embedded_sanity_tests_table(id int64, name string(100)) primary key(id)");
        databaseConnection.createChangeStream(changeStreamName, tableName);
        System.out.println("BasicSanityCheckIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        databaseConnection.dropChangeStream(changeStreamName);
        databaseConnection.dropTable(tableName);
    }

    @Test
    public void shouldNotStartConnectorWithoutRequireConfigs() throws InterruptedException {
        Configuration config = Configuration.create().with("gcp.spanner.instance.id", database.getInstanceId()).build();
        this.start(SpannerConnector.class, config, (success, msg, error) -> {
            Assertions.assertThat((boolean)success).isFalse();
            Assertions.assertThat((boolean)msg.contains("Connector configuration is not valid"));
        });
        this.assertConnectorNotRunning();
    }

    @Test
    public void shouldNotStartConnectorWithoutNonExistentChangeStreams() throws InterruptedException {
        Configuration config = Configuration.copy((Configuration)baseConfig).with("gcp.spanner.change.stream", "fooBar").with("name", "embedded_sanity_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())).build();
        this.start(SpannerConnector.class, config, (success, msg, error) -> {
            Assertions.assertThat((boolean)success).isFalse();
            Assertions.assertThat((boolean)msg.contains("ChangeStream 'fooBar' doesn't exist or you don't have sufficient permissions"));
        });
        this.assertConnectorNotRunning();
    }

    @Test
    public void shouldNotStartConnectorWithOutOfRangeHeartbeatMillis() throws InterruptedException {
        Configuration config = Configuration.copy((Configuration)baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("heartbeat.interval.ms", "1").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now().plus(2L, ChronoUnit.DAYS))).build();
        this.start(SpannerConnector.class, config, (success, msg, error) -> {
            Assertions.assertThat((boolean)success).isFalse();
            Assertions.assertThat((boolean)msg.contains("Heartbeat interval must be between 100 and 300000"));
        });
        this.assertConnectorNotRunning();
    }

    @Test
    public void shouldStreamUpdatesToKafka() throws InterruptedException {
        Configuration config = Configuration.copy((Configuration)baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("name", "embedded_sanity_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())).build();
        this.initializeConnectorTestFramework();
        this.start(SpannerConnector.class, config);
        this.assertConnectorIsRunning();
        databaseConnection.executeUpdate("insert into embedded_sanity_tests_table(id, name) values (1, 'some name')");
        databaseConnection.executeUpdate("update embedded_sanity_tests_table set name = 'test' where id = 1");
        databaseConnection.executeUpdate("delete from embedded_sanity_tests_table where id = 1");
        this.waitForAvailableRecords(BasicSanityCheckIT.waitTimeForRecords(), TimeUnit.SECONDS);
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10, false);
        List records = sourceRecords.recordsForTopic(this.getTopicName(config, tableName));
        Assertions.assertThat((List)records).hasSize(4);
        Assertions.assertThat((String)((String)((Struct)((SourceRecord)records.get(0)).value()).get("op"))).isEqualTo("c");
        Assertions.assertThat((String)((String)((Struct)((SourceRecord)records.get(1)).value()).get("op"))).isEqualTo("u");
        Assertions.assertThat((String)((String)((Struct)((SourceRecord)records.get(2)).value()).get("op"))).isEqualTo("d");
        Assertions.assertThat((Object)((SourceRecord)records.get(3)).value()).isEqualTo(null);
        this.stopConnector();
        this.assertConnectorNotRunning();
    }
}

