/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.connector.spanner.AbstractSpannerConnectorIT;
import io.debezium.connector.spanner.SpannerConnector;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class KafkaTopicPartitionIT
extends AbstractSpannerConnectorIT {
    private static final String tableName = "kafka_topic_partition_tests_table";
    private static final String changeStreamName = "kafkaTopicPartitionChangeStream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        databaseConnection.createTable("kafka_topic_partition_tests_table(id int64, name string(100),time TIMESTAMP,\n  date DATE,\n  byt BYTES(2000),\n  bool BOOL, long_time int64) primary key(id)");
        databaseConnection.createChangeStream(changeStreamName, tableName);
        Testing.print((Object)"KafkaTopicPartitionIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        databaseConnection.dropChangeStream(changeStreamName);
        databaseConnection.dropTable(tableName);
    }

    @Test
    public void checkRecordsWithSameKeyAreInSamePartition() throws InterruptedException {
        Configuration config = Configuration.copy((Configuration)baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("name", "kafka_topic_partition_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())).build();
        this.initializeConnectorTestFramework();
        this.start(SpannerConnector.class, config);
        this.assertConnectorIsRunning();
        databaseConnection.executeUpdate("insert into kafka_topic_partition_tests_table(id, name) values (1, 'some name')");
        databaseConnection.executeUpdate("update kafka_topic_partition_tests_table set name = 'test' where id = 1");
        databaseConnection.executeUpdate("insert into kafka_topic_partition_tests_table(id, name) values (2, 'test name')");
        databaseConnection.executeUpdate("update kafka_topic_partition_tests_table set bool = true where id = 2");
        this.waitForAvailableRecords(KafkaTopicPartitionIT.waitTimeForRecords(), TimeUnit.SECONDS);
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10, false);
        List records = sourceRecords.recordsForTopic(this.getTopicName(config, tableName));
        Assertions.assertThat((List)records).hasSize(4);
        Map<Object, List<SourceRecord>> keyToRecords = records.stream().collect(Collectors.groupingBy(ConnectRecord::key));
        Assertions.assertThat(keyToRecords).hasSize(2);
        keyToRecords.values().forEach(keyRecordsGroup -> {
            Assert.assertEquals((long)2L, (long)keyRecordsGroup.size());
            SourceRecord record1 = (SourceRecord)keyRecordsGroup.get(0);
            SourceRecord record2 = (SourceRecord)keyRecordsGroup.get(1);
            long commitTimestamp1 = (Long)((Struct)record1.value()).get("ts_ms");
            long commitTimestamp2 = (Long)((Struct)record2.value()).get("ts_ms");
            org.junit.jupiter.api.Assertions.assertTrue((commitTimestamp1 <= commitTimestamp2 ? 1 : 0) != 0);
            Assert.assertEquals((long)1L, (long)keyRecordsGroup.stream().map(SourceRecord::sourcePartition).collect(Collectors.toSet()).size());
        });
        this.stopConnector();
        this.assertConnectorNotRunning();
    }
}

