/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.notification;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.notification.Notification;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.notification.channels.jmx.JmxNotificationChannelMXBean;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.IntrospectionException;
import javax.management.JMX;
import javax.management.MBeanException;
import javax.management.MBeanFeatureInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.junit.Test;

public abstract class AbstractNotificationsIT<T extends SourceConnector>
extends AbstractConnectorTest {
    protected abstract Class<T> connectorClass();

    protected abstract Configuration.Builder config();

    protected abstract String connector();

    protected abstract String server();

    protected String task() {
        return null;
    }

    protected String database() {
        return null;
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(this.connectorClass(), config);
    }

    protected abstract String snapshotStatusResult();

    @Test
    public void notificationCorrectlySentOnItsTopic() throws InterruptedException {
        this.startConnector(config -> (Configuration.Builder)((Configuration.Builder)config.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification")).with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
        this.assertConnectorIsRunning();
        AbstractNotificationsIT.waitForSnapshotToBeCompleted(this.connector(), this.server(), this.task(), this.database());
        ArrayList notifications = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            this.consumeAvailableRecords(r -> {
                if (r.topic().equals("io.debezium.notification")) {
                    notifications.add(r);
                }
            });
            return notifications.size() == 2;
        });
        Assertions.assertThat(notifications).hasSize(2);
        SourceRecord sourceRecord = (SourceRecord)notifications.get(0);
        Assertions.assertThat((String)sourceRecord.topic()).isEqualTo((Object)"io.debezium.notification");
        Assertions.assertThat((String)((Struct)sourceRecord.value()).getString("aggregate_type")).isEqualTo((Object)"Initial Snapshot");
        Assertions.assertThat((String)((Struct)sourceRecord.value()).getString("type")).isEqualTo((Object)"STARTED");
        sourceRecord = (SourceRecord)notifications.get(1);
        Assertions.assertThat((String)sourceRecord.topic()).isEqualTo((Object)"io.debezium.notification");
        Assertions.assertThat((String)((Struct)sourceRecord.value()).getString("aggregate_type")).isEqualTo((Object)"Initial Snapshot");
        Assertions.assertThat((String)((Struct)sourceRecord.value()).getString("type")).isEqualTo((Object)this.snapshotStatusResult());
    }

    @Test
    public void notificationNotSentIfNoChannelIsConfigured() {
        this.startConnector(config -> (Configuration.Builder)config.with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification"));
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.assertNoRecordsToConsume();
    }

    @Test
    public void reportErrorWhenSinkChannelIsEnabledAndNoTopicConfigurationProvided() {
        LogInterceptor logInterceptor = new LogInterceptor("io.debezium.connector");
        this.startConnector(config -> (Configuration.Builder)config.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink"));
        Assertions.assertThat((boolean)logInterceptor.containsErrorMessage("Connector configuration is not valid. The 'notification.sink.topic.name' value is invalid: Notification topic name must be provided when kafka notification channel is enabled")).isTrue();
    }

    @Test
    public void notificationCorrectlySentOnJmx() throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException, InterruptedException {
        this.startConnector(config -> (Configuration.Builder)config.with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "jmx"));
        this.assertConnectorIsRunning();
        AbstractNotificationsIT.waitForSnapshotToBeCompleted(this.connector(), this.server(), this.task(), this.database());
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollDelay(1L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !this.readNotificationFromJmx().isEmpty());
        List<Notification> notifications = this.readNotificationFromJmx();
        Assertions.assertThat(notifications).hasSize(2);
        ((ObjectAssert)Assertions.assertThat((Object)notifications.get(0)).hasFieldOrPropertyWithValue("aggregateType", (Object)"Initial Snapshot")).hasFieldOrPropertyWithValue("type", (Object)"STARTED");
        ((ObjectAssert)Assertions.assertThat((Object)notifications.get(1)).hasFieldOrPropertyWithValue("aggregateType", (Object)"Initial Snapshot")).hasFieldOrPropertyWithValue("type", (Object)this.snapshotStatusResult());
        this.resetNotifications();
        notifications = this.readNotificationFromJmx();
        Assertions.assertThat(notifications).hasSize(0);
    }

    @Test
    public void emittingDebeziumNotificationWillGenerateAJmxNotification() throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException, InterruptedException {
        this.startConnector(config -> (Configuration.Builder)((Configuration.Builder)config.with(CommonConnectorConfig.SNAPSHOT_DELAY_MS, 2000)).with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "jmx"));
        List<javax.management.Notification> jmxNotifications = this.registerJmxNotificationListener();
        this.assertConnectorIsRunning();
        AbstractNotificationsIT.waitForSnapshotToBeCompleted(this.connector(), this.server(), this.task(), this.database());
        Object[] notifications = this.readJmxNotifications();
        Assertions.assertThat((Object[])notifications).allSatisfy(mBeanNotificationInfo -> Assertions.assertThat((String)mBeanNotificationInfo.getName()).isEqualTo((Object)Notification.class.getName()));
        Assertions.assertThat(jmxNotifications).hasSize(2);
        Assertions.assertThat((Object)jmxNotifications.get(0)).hasFieldOrPropertyWithValue("message", (Object)"Initial Snapshot generated a notification");
        Assertions.assertThat((Object)jmxNotifications.get(0).getUserData()).isEqualTo((Object)("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"STARTED\",\"additionalData\":{\"connector_name\":\"" + this.server() + "\"}}"));
        Assertions.assertThat((Object)jmxNotifications.get(1)).hasFieldOrPropertyWithValue("message", (Object)"Initial Snapshot generated a notification");
        Assertions.assertThat((Object)jmxNotifications.get(1).getUserData()).isEqualTo((Object)("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"COMPLETED\",\"additionalData\":{\"connector_name\":\"" + this.server() + "\"}}"));
    }

    private List<Notification> readNotificationFromJmx() throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException {
        ObjectName notificationBean = this.getObjectName();
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        MBeanInfo mBeanInfo = server.getMBeanInfo(notificationBean);
        List attributesNames = Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).collect(Collectors.toList());
        Assertions.assertThat(attributesNames).contains((Object[])new String[]{"Notifications"});
        JmxNotificationChannelMXBean proxy = JMX.newMXBeanProxy(server, notificationBean, JmxNotificationChannelMXBean.class);
        return proxy.getNotifications();
    }

    private MBeanNotificationInfo[] readJmxNotifications() throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException {
        ObjectName notificationBean = this.getObjectName();
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        MBeanInfo mBeanInfo = server.getMBeanInfo(notificationBean);
        return mBeanInfo.getNotifications();
    }

    private ObjectName getObjectName() throws MalformedObjectNameException {
        return new ObjectName(String.format("debezium.%s:type=management, context=notifications, server=%s", this.connector(), this.server()));
    }

    private List<javax.management.Notification> registerJmxNotificationListener() throws MalformedObjectNameException, InstanceNotFoundException {
        ObjectName notificationBean = this.getObjectName();
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        ArrayList<javax.management.Notification> receivedNotifications = new ArrayList<javax.management.Notification>();
        server.addNotificationListener(notificationBean, new ClientListener(), null, receivedNotifications);
        return receivedNotifications;
    }

    private void resetNotifications() throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
        ObjectName notificationBean = this.getObjectName();
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        server.invoke(notificationBean, "reset", new Object[0], new String[0]);
    }

    public static class ClientListener
    implements NotificationListener {
        @Override
        public void handleNotification(javax.management.Notification notification, Object handback) {
            ((List)handback).add(notification);
        }
    }
}

