/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.connector.spanner.AbstractSpannerConnectorIT;
import io.debezium.connector.spanner.SpannerConnector;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class LowWatermarkRecordIT
extends AbstractSpannerConnectorIT {
    private static final String tableName = "low_watermark_record_tests_table";
    private static final String changeStreamName = "lowWatermarkRecordTestChangeStream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        databaseConnection.createTable("low_watermark_record_tests_table(id int64, name string(100)) primary key(id)");
        databaseConnection.createChangeStream(changeStreamName, tableName);
        Testing.print((Object)"LowWatermarkRecordIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        databaseConnection.dropChangeStream(changeStreamName);
        databaseConnection.dropTable(tableName);
    }

    @Test
    public void shouldStreamUpdatesToKafka() throws InterruptedException {
        Instant now = Instant.now();
        Configuration config = ((Configuration.Builder)Configuration.copy((Configuration)baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("name", "low_watermark_record_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(now)).with("gcp.spanner.low-watermark.enabled", true)).build();
        this.initializeConnectorTestFramework();
        this.start(SpannerConnector.class, config);
        this.assertConnectorIsRunning();
        databaseConnection.executeUpdate("insert into low_watermark_record_tests_table(id, name) values (1, 'some name')");
        databaseConnection.executeUpdate("update low_watermark_record_tests_table set name = 'test' where id = 1");
        this.waitForAvailableRecords(LowWatermarkRecordIT.waitTimeForRecords(), TimeUnit.SECONDS);
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10, false);
        List records = sourceRecords.recordsForTopic(this.getTopicName(config, tableName));
        List<Long> lowWatermarks = records.stream().map(rec -> rec.value() != null ? (Long)((Struct)((Struct)rec.value()).get("source")).get("low_watermark") : null).filter(Objects::nonNull).collect(Collectors.toList());
        Assertions.assertThat((!lowWatermarks.isEmpty() ? 1 : 0) != 0);
        Assertions.assertThat(((Long)Collections.max(lowWatermarks) > now.plus(2L, ChronoUnit.SECONDS).toEpochMilli() ? 1 : 0) != 0);
        this.validateLowWatermarks(records, lowWatermarks);
        this.stopConnector();
        this.assertConnectorNotRunning();
    }

    private void validateLowWatermarks(List<SourceRecord> records, List<Long> lowWatermarks) {
        for (SourceRecord record : records) {
            if (record.timestamp() == null) continue;
            Assert.assertTrue((record.timestamp() > lowWatermarks.get(0) ? 1 : 0) != 0);
        }
    }
}

