/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.notification;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public abstract class AbstractNotificationsIT<T extends SourceConnector>
extends AbstractConnectorTest {
    protected abstract Class<T> connectorClass();

    protected abstract Configuration.Builder config();

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(this.connectorClass(), config);
    }

    protected abstract String snapshotStatusResult();

    @Test
    public void notificationCorrectlySentOnItsTopic() throws InterruptedException {
        this.startConnector(config -> (Configuration.Builder)((Configuration.Builder)config.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification")).with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat(records.allRecordsInOrder()).hasSize(1);
        SourceRecord sourceRecord = records.allRecordsInOrder().get(0);
        Assertions.assertThat((String)sourceRecord.topic()).isEqualTo((Object)"io.debezium.notification");
        Assertions.assertThat((String)((Struct)sourceRecord.value()).getString("aggregate_type")).isEqualTo((Object)"Initial Snapshot");
        Assertions.assertThat((String)((Struct)sourceRecord.value()).getString("type")).isEqualTo((Object)("Status " + this.snapshotStatusResult()));
    }

    @Test
    public void notificationNotSentIfNoChannelIsConfigured() {
        this.startConnector(config -> (Configuration.Builder)config.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification"));
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
    }

    @Test
    public void reportErrorWhenSinkChannelIsEnabledAndNoTopicConfigurationProvided() {
        LogInterceptor logInterceptor = new LogInterceptor("io.debezium.connector");
        this.startConnector(config -> (Configuration.Builder)config.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
        Assertions.assertThat((boolean)logInterceptor.containsErrorMessage("Connector configuration is not valid. The 'notification.sink.topic.name' value is invalid: Notification topic name must be provided when kafka notification channel is enabled")).isTrue();
    }
}

