/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.pipeline.notification.AbstractNotificationsIT;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class NotificationsIT
extends AbstractNotificationsIT<SqlServerConnector> {
    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        SqlServerConnection sqlServerConnection = TestHelper.testConnection();
        sqlServerConnection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(sqlServerConnection, "tablea");
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() {
        this.stopConnector();
        TestHelper.dropTestDatabase();
    }

    protected List<String> collections() {
        return List.of("dbo.tablea", "dbo.tableb").stream().map(sch_tbl -> String.format("%s.%s", this.database(), sch_tbl)).collect(Collectors.toList());
    }

    protected Class<SqlServerConnector> connectorClass() {
        return SqlServerConnector.class;
    }

    protected Configuration.Builder config() {
        return (Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL);
    }

    protected String connector() {
        return "sql_server";
    }

    protected String server() {
        return "server1";
    }

    protected String task() {
        return "0";
    }

    protected String database() {
        return "testDB1";
    }

    protected String snapshotStatusResult() {
        return "COMPLETED";
    }

    @Test
    public void completeReadingFromACaptureInstanceNotificationEmitted() throws SQLException {
        this.startConnector(config -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)config.with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.NO_DATA)).with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification")).with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
        this.assertConnectorIsRunning();
        TestHelper.waitForStreamingStarted();
        SqlServerConnection connection = TestHelper.testConnection();
        connection.execute(new String[]{"INSERT INTO tablea VALUES(2, 'b')"});
        connection.execute(new String[]{"ALTER TABLE tablea ADD colb int NULL"});
        TestHelper.enableTableCdc(connection, "tablea", "tablea_c2");
        connection.execute(new String[]{"INSERT INTO tablea VALUES(3, 'c', 3)"});
        connection.close();
        ArrayList notifications = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            this.consumeAvailableRecords(r -> {
                if (r.topic().equals("io.debezium.notification")) {
                    notifications.add(r);
                }
            });
            return notifications.size() == 3;
        });
        Assertions.assertThat(notifications).hasSize(3);
        SourceRecord sourceRecord = (SourceRecord)notifications.get(2);
        Assertions.assertThat((String)sourceRecord.topic()).isEqualTo((Object)"io.debezium.notification");
        Struct value = (Struct)sourceRecord.value();
        Assertions.assertThat((String)value.getString("aggregate_type")).isEqualTo((Object)"Capture Instance");
        Assertions.assertThat((String)value.getString("type")).isEqualTo((Object)"COMPLETED");
        Assertions.assertThat((Long)value.getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage((double)1.0));
        Map additionalData = value.getMap("additional_data");
        Assertions.assertThat((String)((String)additionalData.get("server"))).isEqualTo((Object)"server1");
        Assertions.assertThat((String)((String)additionalData.get("database"))).isEqualTo((Object)"testDB1");
        Assertions.assertThat((String)((String)additionalData.get("capture_instance"))).isEqualTo((Object)"dbo_tablea");
        Lsn startLsn = Lsn.valueOf((String)((String)additionalData.get("start_lsn")));
        Lsn stopLsn = Lsn.valueOf((String)((String)additionalData.get("stop_lsn")));
        Lsn commitLsn = Lsn.valueOf((String)((String)additionalData.get("commit_lsn")));
        Assertions.assertThat((Comparable)startLsn).isLessThan((Comparable)stopLsn);
        Assertions.assertThat((Comparable)stopLsn).isLessThan((Comparable)commitLsn);
        connection = TestHelper.testConnection();
        connection.execute(new String[]{"EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'tablea', @capture_instance = 'dbo_tablea'"});
        connection.execute(new String[]{"ALTER TABLE tablea ADD colc int NULL"});
        TestHelper.enableTableCdc(connection, "tablea", "tablea_c3");
        connection.execute(new String[]{"INSERT INTO tablea VALUES(4, 'c', 4, 4)"});
        connection.close();
        ArrayList notifications2 = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            this.consumeAvailableRecords(r -> {
                if (r.topic().equals("io.debezium.notification")) {
                    notifications2.add(r);
                }
            });
            return notifications2.size() == 1;
        });
        Assertions.assertThat(notifications2).hasSize(1);
        sourceRecord = (SourceRecord)notifications2.get(0);
        Assertions.assertThat((String)sourceRecord.topic()).isEqualTo((Object)"io.debezium.notification");
        value = (Struct)sourceRecord.value();
        Assertions.assertThat((String)value.getString("aggregate_type")).isEqualTo((Object)"Capture Instance");
        Assertions.assertThat((String)value.getString("type")).isEqualTo((Object)"COMPLETED");
        Assertions.assertThat((Long)value.getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage((double)1.0));
        additionalData = value.getMap("additional_data");
        Assertions.assertThat((String)((String)additionalData.get("server"))).isEqualTo((Object)"server1");
        Assertions.assertThat((String)((String)additionalData.get("database"))).isEqualTo((Object)"testDB1");
        Assertions.assertThat((String)((String)additionalData.get("capture_instance"))).isEqualTo((Object)"tablea_c2");
        startLsn = Lsn.valueOf((String)((String)additionalData.get("start_lsn")));
        stopLsn = Lsn.valueOf((String)((String)additionalData.get("stop_lsn")));
        commitLsn = Lsn.valueOf((String)((String)additionalData.get("commit_lsn")));
        Assertions.assertThat((Comparable)startLsn).isLessThan((Comparable)stopLsn);
        Assertions.assertThat((Comparable)stopLsn).isLessThan((Comparable)commitLsn);
    }
}

