/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.connector.spanner.AbstractSpannerConnectorIT;
import io.debezium.connector.spanner.SpannerConnector;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class GracefulRestartIT
extends AbstractSpannerConnectorIT {
    private static final String tableName = "graceful_restart_tests_table";
    private static final String changeStreamName = "gracefulRestartChangeStream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        databaseConnection.createTable("graceful_restart_tests_table(id int64, name string(100)) primary key(id)");
        databaseConnection.createChangeStream(changeStreamName, tableName);
        Testing.print((Object)"GracefulRestartIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        databaseConnection.dropChangeStream(changeStreamName);
        databaseConnection.dropTable(tableName);
    }

    @Test
    public void checkUpdatesStreamedToKafka() throws InterruptedException {
        this.stopConnector();
        Configuration config = Configuration.copy((Configuration)baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("name", "graceful_restart_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())).build();
        this.initializeConnectorTestFramework();
        this.start(SpannerConnector.class, config);
        this.assertConnectorIsRunning();
        databaseConnection.executeUpdate("insert into graceful_restart_tests_table(id, name) values (1, 'some name')");
        this.waitForAvailableRecords(GracefulRestartIT.waitTimeForRecords(), TimeUnit.SECONDS);
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(5, false);
        List records = sourceRecords.recordsForTopic(this.getTopicName(config, tableName));
        Assertions.assertThat((List)records).hasSize(1);
        this.stopConnector();
        this.assertConnectorNotRunning();
        databaseConnection.executeUpdate("update graceful_restart_tests_table set name = 'test' where id = 1");
        this.start(SpannerConnector.class, config);
        this.waitForAvailableRecords(GracefulRestartIT.waitTimeForRecords(), TimeUnit.SECONDS);
        AbstractConnectorTest.SourceRecords sourceRecords2 = this.consumeRecordsByTopic(10, false);
        List records2 = sourceRecords2.recordsForTopic(this.getTopicName(config, tableName));
        Assertions.assertThat((List)records2).hasSizeGreaterThanOrEqualTo(1);
        Assertions.assertThat((String)((String)((Struct)((SourceRecord)records2.get(0)).value()).get("op"))).isEqualTo("c");
        this.stopConnector();
        this.assertConnectorNotRunning();
    }
}

