/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.transforms.outbox;

import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.AbstractMongoConnectorIT;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter;
import io.debezium.connector.mongodb.transforms.outbox.MongoEventRouterConfigDefinition;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MongoEventRouterTestIT
extends AbstractMongoConnectorIT {
    protected static final String DB_NAME = "db";
    protected static final String SERVER_NAME = "mongo";
    private MongoEventRouter<SourceRecord> outboxEventRouter;

    @Before
    public void beforeEach() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "db." + this.getCollectionName())).with(MongoDbConnectorConfig.LOGICAL_NAME, SERVER_NAME)).build();
        this.beforeEach(config);
    }

    private String getCollectionName() {
        return "test";
    }

    protected String topicName() {
        return String.format("%s.%s.%s", SERVER_NAME, DB_NAME, this.getCollectionName());
    }

    public void beforeEach(Configuration config) {
        Testing.Debug.disable();
        Testing.Print.disable();
        this.stopConnector();
        this.initializeConnectorTestFramework();
        this.outboxEventRouter = new MongoEventRouter();
        this.outboxEventRouter.configure(Collections.emptyMap());
        this.context = new MongoDbTaskContext(config);
        TestHelper.cleanDatabase(this.primary(), DB_NAME);
        this.start(MongoDbConnector.class, config);
    }

    @Override
    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            if (this.context != null) {
                this.context.getConnectionContext().shutdown();
            }
        }
        this.outboxEventRouter.close();
    }

    @Test
    public void shouldConsumeRecordsFromInsert() throws Exception {
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne((Object)new Document().append("aggregateid", (Object)123L).append("aggregatetype", (Object)"Order").append("type", (Object)"OrderCreated").append("payload", (Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("fullName", (Object)"John Doe").append("enabled", (Object)true).append("asset", (Object)100000L).append("age", (Object)42).append("pets", Arrays.asList("dog", "cat")))));
        AbstractConnectorTest.SourceRecords actualRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)actualRecords.topics().size()).isEqualTo(1);
        SourceRecord newEventRecord = (SourceRecord)actualRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord routedEvent = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)newEventRecord);
        Assertions.assertThat((Object)routedEvent).isNotNull();
        Assertions.assertThat((String)routedEvent.topic()).isEqualTo((Object)"outbox.event.Order");
        Assertions.assertThat((Object)routedEvent.keySchema().type()).isEqualTo((Object)Schema.Type.INT64);
        Assertions.assertThat((Object)routedEvent.key()).isEqualTo((Object)123L);
        Object value = routedEvent.value();
        Assertions.assertThat((Object)value).isInstanceOf(String.class);
        Document payload = Document.parse((String)((String)value));
        Assertions.assertThat((Object)payload.get((Object)"_id")).isEqualTo((Object)new ObjectId("000000000000000000000000"));
        Assertions.assertThat((Object)payload.get((Object)"fullName")).isEqualTo((Object)"John Doe");
        Assertions.assertThat((Object)payload.get((Object)"asset")).isEqualTo((Object)100000L);
        Assertions.assertThat((Object)payload.get((Object)"enabled")).isEqualTo((Object)true);
        Assertions.assertThat((Object)payload.get((Object)"pets")).isEqualTo(Arrays.asList("dog", "cat"));
    }

    @Test
    public void shouldSendEventTypeAsHeader() throws Exception {
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne((Object)new Document().append("aggregateid", (Object)123L).append("aggregatetype", (Object)"Order").append("type", (Object)"OrderCreated").append("payload", (Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("fullName", (Object)"John Doe").append("asset", (Object)100000L))));
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:header:eventType");
        this.outboxEventRouter.configure(config);
        AbstractConnectorTest.SourceRecords actualRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)actualRecords.topics().size()).isEqualTo(1);
        SourceRecord newEventRecord = (SourceRecord)actualRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord routedEvent = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)newEventRecord);
        Assertions.assertThat((Object)routedEvent).isNotNull();
        Assertions.assertThat((String)routedEvent.topic()).isEqualTo((Object)"outbox.event.Order");
        Object value = routedEvent.value();
        Assertions.assertThat((Object)routedEvent.headers().lastWithName("eventType").value()).isEqualTo((Object)"OrderCreated");
        Assertions.assertThat((Object)routedEvent.key()).isEqualTo((Object)123L);
        Assertions.assertThat((Object)value).isInstanceOf(String.class);
        Document payload = Document.parse((String)((String)value));
        Assertions.assertThat((Object)payload.get((Object)"_id")).isEqualTo((Object)new ObjectId("000000000000000000000000"));
        Assertions.assertThat((Object)payload.get((Object)"fullName")).isEqualTo((Object)"John Doe");
        Assertions.assertThat((Object)payload.get((Object)"asset")).isEqualTo((Object)100000L);
    }

    @Test
    public void shouldSendEventTypeAsValue() throws Exception {
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne((Object)new Document().append("aggregateid", (Object)123L).append("aggregatetype", (Object)"Order").append("type", (Object)"OrderCreated").append("payload", (Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("fullName", (Object)"John Doe").append("asset", (Object)100000L).append("age", (Object)42))));
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:eventType");
        this.outboxEventRouter.configure(config);
        AbstractConnectorTest.SourceRecords actualRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)actualRecords.topics().size()).isEqualTo(1);
        SourceRecord newEventRecord = (SourceRecord)actualRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord routedEvent = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)newEventRecord);
        Assertions.assertThat((Object)routedEvent).isNotNull();
        Assertions.assertThat((String)routedEvent.topic()).isEqualTo((Object)"outbox.event.Order");
        Struct valueStruct = Requirements.requireStruct((Object)routedEvent.value(), (String)"test payload");
        Assertions.assertThat((String)valueStruct.getString("eventType")).isEqualTo((Object)"OrderCreated");
        Document payload = Document.parse((String)((String)valueStruct.get("payload")));
        Assertions.assertThat((Object)payload.get((Object)"_id")).isEqualTo((Object)new ObjectId("000000000000000000000000"));
        Assertions.assertThat((Object)payload.get((Object)"fullName")).isEqualTo((Object)"John Doe");
        Assertions.assertThat((Object)payload.get((Object)"asset")).isEqualTo((Object)100000L);
    }

    @Test
    public void shouldSupportAllFeatures() throws Exception {
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne((Object)new Document().append("_id", (Object)new ObjectId("111111111111111111111111")).append("aggregateid", (Object)123L).append("aggregatetype", (Object)"Order").append("type", (Object)"OrderCreated").append("somebooltype", (Object)true).append("version", (Object)2).append("createdat", (Object)12342452512L).append("is_deleted", (Object)false).append("payload", (Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("fullName", (Object)"John Doe").append("asset", (Object)100000L).append("age", (Object)42))));
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "createdat");
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "version:envelope:eventVersion,aggregatetype:envelope:aggregateType,somebooltype:envelope:someBoolType,somebooltype:header,is_deleted:envelope:deleted");
        this.outboxEventRouter.configure(config);
        AbstractConnectorTest.SourceRecords actualRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)actualRecords.topics().size()).isEqualTo(1);
        SourceRecord newEventRecord = (SourceRecord)actualRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord eventRouted = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)newEventRecord);
        Assertions.assertThat((Long)eventRouted.timestamp()).isEqualTo(12342452512L);
        Assertions.assertThat((String)eventRouted.topic()).isEqualTo((Object)"outbox.event.Order");
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(2);
        Header headerId = headers.lastWithName("id");
        Assertions.assertThat((Object)headerId.schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)headerId.value()).isEqualTo((Object)"111111111111111111111111");
        Header headerBool = headers.lastWithName("somebooltype");
        Assertions.assertThat((Object)headerBool.schema()).isEqualTo((Object)Schema.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)headerBool.value()).isEqualTo((Object)true);
        Assertions.assertThat((Object)eventRouted.keySchema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)123L);
        Struct valueStruct = Requirements.requireStruct((Object)eventRouted.value(), (String)"test envelope");
        Assertions.assertThat((String)valueStruct.getString("aggregateType")).isEqualTo((Object)"Order");
        Assertions.assertThat((Integer)valueStruct.getInt32("eventVersion")).isEqualTo(2);
        Assertions.assertThat((Boolean)valueStruct.getBoolean("someBoolType")).isEqualTo(true);
        Assertions.assertThat((Boolean)valueStruct.getBoolean("deleted")).isEqualTo(false);
    }

    @Test
    public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne((Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("aggregateid", (Object)123L).append("aggregatetype", (Object)"Order").append("type", (Object)"OrderCreated").append("payload", null)));
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "aggregatetype:envelope:aggregateType");
        this.outboxEventRouter.configure(config);
        AbstractConnectorTest.SourceRecords actualRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)actualRecords.topics().size()).isEqualTo(1);
        SourceRecord newEventRecord = (SourceRecord)actualRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord eventRouted = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)newEventRecord);
        Assertions.assertThat((String)eventRouted.topic()).isEqualTo((Object)"outbox.event.Order");
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(1);
        Header headerId = headers.lastWithName("id");
        Assertions.assertThat((Object)headerId.value()).isEqualTo((Object)"000000000000000000000000");
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.INT64);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)123L);
        Assertions.assertThat((Object)eventRouted.valueSchema()).isNotNull();
        Assertions.assertThat((Object)eventRouted.value()).isNotNull();
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("payload")).isNull();
    }

    @Test
    public void shouldProduceTombstoneEventForNullPayload() throws Exception {
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne((Object)new Document().append("_id", (Object)new ObjectId("000000000000000000000000")).append("aggregateid", (Object)123L).append("aggregatetype", (Object)"Order").append("type", (Object)"OrderCreated").append("payload", null)));
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("route.tombstone.on.empty.payload", "true");
        this.outboxEventRouter.configure(config);
        AbstractConnectorTest.SourceRecords actualRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)actualRecords.topics().size()).isEqualTo(1);
        SourceRecord newEventRecord = (SourceRecord)actualRecords.recordsForTopic(this.topicName()).get(0);
        SourceRecord eventRouted = (SourceRecord)this.outboxEventRouter.apply((ConnectRecord)newEventRecord);
        Assertions.assertThat((String)eventRouted.topic()).isEqualTo((Object)"outbox.event.Order");
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(1);
        Header headerId = headers.lastWithName("id");
        Assertions.assertThat((Object)headerId.value()).isEqualTo((Object)"000000000000000000000000");
        Assertions.assertThat((Object)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.INT64);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)123L);
        Assertions.assertThat((Object)eventRouted.valueSchema()).isNull();
        Assertions.assertThat((Object)eventRouted.value()).isNull();
    }
}

