/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.converters;

import io.debezium.config.Configuration;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.transforms.outbox.EventRouter;
import java.util.LinkedHashMap;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;
import org.apache.kafka.connect.transforms.InsertHeader;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractCloudEventsConverterTest<T extends SourceConnector>
extends AbstractConnectorTest {
    protected abstract Class<T> getConnectorClass();

    protected abstract String getConnectorName();

    protected abstract String getServerName();

    protected abstract JdbcConnection databaseConnection();

    protected abstract Configuration.Builder getConfigurationBuilder();

    protected abstract String topicName();

    protected abstract String topicNameOutbox();

    protected abstract void createTable() throws Exception;

    protected abstract void createOutboxTable() throws Exception;

    protected abstract String createInsert();

    protected abstract String createInsertToOutbox(String var1, String var2, String var3, String var4, String var5, String var6);

    protected abstract void waitForStreamingStarted() throws InterruptedException;

    @Before
    public void beforeEach() throws Exception {
        this.startConnector();
    }

    @After
    public void afterEach() throws Exception {
        this.stopConnector();
        this.assertNoRecordsToConsume();
        this.databaseConnection().close();
    }

    @Test
    @FixFor(value={"DBZ-6982"})
    public void shouldConvertToCloudEventsInJsonWithoutExtensionAttributes() throws Exception {
        this.createTable();
        this.databaseConnection().execute(new String[]{this.createInsert()});
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = streamingRecords.recordsForTopic(this.topicName()).get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithoutExtensionAttributes((SourceRecord)record);
    }

    @Test
    @FixFor(value={"DBZ-3642", "DBZ-7016"})
    public void shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeadersAfterOutboxEventRouter() throws Exception {
        HeaderFrom.Value headerFrom = new HeaderFrom.Value();
        LinkedHashMap<String, String> headerFromConfig = new LinkedHashMap<String, String>();
        headerFromConfig.put("fields", "source,op,transaction");
        headerFromConfig.put("headers", "source,op,transaction");
        headerFromConfig.put("operation", "copy");
        headerFromConfig.put("header.converter.schemas.enable", "true");
        headerFrom.configure(headerFromConfig);
        EventRouter outboxEventRouter = new EventRouter();
        LinkedHashMap<String, String> outboxEventRouterConfig = new LinkedHashMap<String, String>();
        outboxEventRouterConfig.put("table.expand.json.payload", "true");
        outboxEventRouterConfig.put("table.fields.additional.placement", "event_type:header:type");
        outboxEventRouter.configure(outboxEventRouterConfig);
        this.createOutboxTable();
        this.databaseConnection().execute(new String[]{this.createInsertToOutbox("59a42efd-b015-44a9-9dde-cb36d9002425", "UserCreated", "User", "10711fa5", "{\"someField1\": \"some value 1\",\"someField2\": 7005}", "")});
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = streamingRecords.recordsForTopic(this.topicNameOutbox()).get(0);
        SourceRecord recordWithMetadataHeaders = (SourceRecord)headerFrom.apply((ConnectRecord)record);
        SourceRecord routedEvent = (SourceRecord)outboxEventRouter.apply((ConnectRecord)recordWithMetadataHeaders);
        Assertions.assertThat((Object)routedEvent).isNotNull();
        Assertions.assertThat((String)routedEvent.topic()).isEqualTo((Object)"outbox.event.User");
        Assertions.assertThat((Object)routedEvent.keySchema()).isEqualTo((Object)Schema.STRING_SCHEMA);
        Assertions.assertThat((Object)routedEvent.key()).isEqualTo((Object)"10711fa5");
        Assertions.assertThat((Object)routedEvent.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders((SourceRecord)routedEvent, (String)this.getConnectorName(), (String)this.getServerName());
        headerFrom.close();
        outboxEventRouter.close();
    }

    @Test
    @FixFor(value={"DBZ-7016"})
    public void shouldConvertToCloudEventsInJsonWithIdFromHeaderAndGeneratedType() throws Exception {
        InsertHeader insertHeader = new InsertHeader();
        LinkedHashMap<String, String> insertHeaderConfig = new LinkedHashMap<String, String>();
        insertHeaderConfig.put("header", "id");
        insertHeaderConfig.put("value.literal", "77742efd-b015-44a9-9dde-cb36d9002425");
        insertHeader.configure(insertHeaderConfig);
        this.createTable();
        this.databaseConnection().execute(new String[]{this.createInsert()});
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = streamingRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord recordWithTypeInHeader = (SourceRecord)insertHeader.apply((ConnectRecord)record);
        Assertions.assertThat((Object)recordWithTypeInHeader).isNotNull();
        Assertions.assertThat((String)recordWithTypeInHeader.topic()).isEqualTo((Object)this.topicName());
        Assertions.assertThat((Object)recordWithTypeInHeader.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithIdFromHeaderAndGeneratedType((SourceRecord)recordWithTypeInHeader, (String)this.getConnectorName(), (String)this.getServerName());
        insertHeader.close();
    }

    @Test
    @FixFor(value={"DBZ-7159"})
    public void shouldThrowExceptionWhenDeserializingNotCloudEventJson() throws Exception {
        this.createTable();
        this.databaseConnection().execute(new String[]{this.createInsert()});
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = streamingRecords.recordsForTopic(this.topicName()).get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldThrowExceptionWhenDeserializingNotCloudEventJson((SourceRecord)record);
    }

    @Test
    @FixFor(value={"DBZ-7159"})
    public void shouldThrowExceptionWhenDeserializingNotCloudEventAvro() throws Exception {
        this.createTable();
        this.databaseConnection().execute(new String[]{this.createInsert()});
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = streamingRecords.recordsForTopic(this.topicName()).get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldThrowExceptionWhenDeserializingNotCloudEventAvro((SourceRecord)record);
    }

    private void startConnector() throws Exception {
        Configuration.Builder configBuilder = this.getConfigurationBuilder();
        this.start(this.getConnectorClass(), configBuilder.build());
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        this.assertNoRecordsToConsume();
    }
}

