/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.client.MongoClient;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.AbstractMongoConnectorIT;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;
import org.apache.kafka.connect.transforms.InsertHeader;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.Before;
import org.junit.Test;

public class CloudEventsConverterIT
extends AbstractMongoConnectorIT {
    protected static final String SERVER_NAME = "mongo1";
    protected static final String DB_NAME = "dbA";
    protected static final String COLLECTION_NAME = "c1";

    @Override
    @Before
    public void beforeEach() {
        Testing.Print.enable();
        this.config = this.getConfiguration();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(mongo, DB_NAME);
        this.start(MongoDbConnector.class, this.config);
        this.assertConnectorIsRunning();
    }

    @Test
    public void testCorrectFormat() throws Exception {
        CloudEventsConverterIT.waitForSnapshotToBeCompleted("mongodb", SERVER_NAME);
        List<Document> documentsToInsert = this.loadTestDocuments("restaurants1.json");
        this.insertDocuments(DB_NAME, COLLECTION_NAME, documentsToInsert.toArray(new Document[0]));
        Document updateObj = new Document().append("$set", (Object)new Document().append("name", (Object)"Closed"));
        this.updateDocument(DB_NAME, COLLECTION_NAME, Document.parse((String)"{\"restaurant_id\": \"30075445\"}"), updateObj);
        Thread.sleep(1000L);
        this.deleteDocuments(DB_NAME, COLLECTION_NAME, Document.parse((String)"{\"restaurant_id\": \"30075445\"}"));
        int recCount = 8;
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(8);
        List c1s = records.recordsForTopic("mongo1.dbA.c1");
        Assertions.assertThat((List)c1s).hasSize(8);
        List insertRecords = c1s.subList(0, 6);
        SourceRecord updateRecord = (SourceRecord)c1s.get(6);
        SourceRecord deleteRecord = (SourceRecord)c1s.get(7);
        for (SourceRecord record : insertRecords) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)record, (String)"mongodb", (String)SERVER_NAME, (boolean)false);
        }
        CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)deleteRecord, (boolean)false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)deleteRecord, (String)"mongodb", (String)SERVER_NAME, (boolean)false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)updateRecord, (boolean)false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)updateRecord, (String)"updateDescription", (boolean)false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)updateRecord, (String)"after", (boolean)false);
        CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)updateRecord, (String)"mongodb", (String)SERVER_NAME, (boolean)false);
    }

    @Test
    @FixFor(value={"DBZ-6982"})
    public void shouldConvertToCloudEventsInJsonWithoutExtensionAttributes() throws Exception {
        try (MongoClient client = this.connect();){
            client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)new Document().append("pk", (Object)1).append("aa", (Object)1));
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = (SourceRecord)streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithoutExtensionAttributes((SourceRecord)record);
    }

    @Test
    @FixFor(value={"DBZ-3642"})
    public void shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeadersAfterOutboxEventRouter() throws Exception {
        HeaderFrom.Value headerFrom = new HeaderFrom.Value();
        LinkedHashMap<String, String> headerFromConfig = new LinkedHashMap<String, String>();
        headerFromConfig.put("fields", "source,op,transaction");
        headerFromConfig.put("headers", "source,op,transaction");
        headerFromConfig.put("operation", "copy");
        headerFromConfig.put("header.converter.schemas.enable", "true");
        headerFrom.configure(headerFromConfig);
        MongoEventRouter outboxEventRouter = new MongoEventRouter();
        LinkedHashMap<String, String> outboxEventRouterConfig = new LinkedHashMap<String, String>();
        outboxEventRouterConfig.put("collection.expand.json.payload", "true");
        outboxEventRouterConfig.put("collection.fields.additional.placement", "event_type:header:type,id:header:id");
        outboxEventRouter.configure(outboxEventRouterConfig);
        try (MongoClient client = this.connect();){
            client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)new Document().append("id", (Object)"59a42efd-b015-44a9-9dde-cb36d9002425").append("aggregateid", (Object)"10711fa5").append("aggregatetype", (Object)"User").append("event_type", (Object)"UserCreated").append("payload", (Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("someField1", (Object)"some value 1").append("someField2", (Object)7005L)));
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = (SourceRecord)streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
        SourceRecord recordWithMetadataHeaders = (SourceRecord)headerFrom.apply((ConnectRecord)record);
        SourceRecord routedEvent = (SourceRecord)outboxEventRouter.apply((ConnectRecord)recordWithMetadataHeaders);
        Assertions.assertThat((Object)routedEvent).isNotNull();
        Assertions.assertThat((String)routedEvent.topic()).isEqualTo((Object)"outbox.event.User");
        Assertions.assertThat((Comparable)routedEvent.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)routedEvent.key()).isEqualTo((Object)"10711fa5");
        Assertions.assertThat((Object)routedEvent.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders((SourceRecord)routedEvent, (String)"mongodb", (String)SERVER_NAME);
        headerFrom.close();
        outboxEventRouter.close();
    }

    @Test
    @FixFor(value={"DBZ-7016"})
    public void shouldConvertToCloudEventsInJsonWithGeneratedIdAndTypeFromHeader() throws Exception {
        InsertHeader insertHeader = new InsertHeader();
        LinkedHashMap<String, String> insertHeaderConfig = new LinkedHashMap<String, String>();
        insertHeaderConfig.put("header", "id");
        insertHeaderConfig.put("value.literal", "77742efd-b015-44a9-9dde-cb36d9002425");
        insertHeader.configure(insertHeaderConfig);
        try (MongoClient client = this.connect();){
            client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)new Document().append("pk", (Object)1).append("aa", (Object)1));
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = (SourceRecord)streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
        SourceRecord recordWithTypeInHeader = (SourceRecord)insertHeader.apply((ConnectRecord)record);
        Assertions.assertThat((Object)recordWithTypeInHeader).isNotNull();
        Assertions.assertThat((Object)recordWithTypeInHeader.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithIdFromHeaderAndGeneratedType((SourceRecord)recordWithTypeInHeader, (String)"mongodb", (String)SERVER_NAME);
        insertHeader.close();
    }

    @Test
    @FixFor(value={"DBZ-7159"})
    public void shouldThrowExceptionWhenDeserializingNotCloudEventJson() throws Exception {
        try (MongoClient client = this.connect();){
            client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)new Document().append("pk", (Object)1).append("aa", (Object)1));
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = (SourceRecord)streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldThrowExceptionWhenDeserializingNotCloudEventJson((SourceRecord)record);
    }

    @Test
    @FixFor(value={"DBZ-7159"})
    public void shouldThrowExceptionWhenDeserializingNotCloudEventAvro() throws Exception {
        try (MongoClient client = this.connect();){
            client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)new Document().append("pk", (Object)1).append("aa", (Object)1));
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = (SourceRecord)streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldThrowExceptionWhenDeserializingNotCloudEventAvro((SourceRecord)record);
    }

    @Test
    @FixFor(value={"DBZ-7235"})
    public void shouldConvertToCloudEventsInAvroWithCustomCloudEventsSchemaName() throws Exception {
        try (MongoClient client = this.connect();){
            client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)new Document().append("pk", (Object)1).append("aa", (Object)1));
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(1);
        SourceRecord record = (SourceRecord)streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
        Assertions.assertThat((Object)record).isNotNull();
        Assertions.assertThat((Object)record.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInAvroWithCustomCloudEventsSchemaName((SourceRecord)record);
    }

    private Configuration getConfiguration() {
        return ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration(mongo).edit().with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.INITIAL)).with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")).with(CommonConnectorConfig.TOPIC_PREFIX, SERVER_NAME)).build();
    }
}

