/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.transforms.outbox;

import io.debezium.connector.mongodb.MongoDbSchema;
import io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter;
import io.debezium.connector.mongodb.transforms.outbox.MongoEventRouterConfigDefinition;
import io.debezium.data.Envelope;
import io.debezium.data.Json;
import io.debezium.data.VerifyRecord;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.bson.types.ObjectId;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MongoEventRouterTest {
    JsonWriterSettings COMPACT_JSON_SETTINGS = JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).indent(true).newLineCharacters("\n").build();
    MongoEventRouter<SourceRecord> router;

    @Before
    public void beforeEach() {
        this.router = new MongoEventRouter();
    }

    @Test
    public void canSkipTombstone() {
        HashMap config = new HashMap();
        this.router.configure(config);
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"123123", null, null);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNull();
    }

    @Test
    public void canSkipDeletion() {
        HashMap config = new HashMap();
        this.router.configure(config);
        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(Schema.STRING_SCHEMA).withSource(SchemaBuilder.struct().build()).build();
        Struct payload = envelope.delete((Object)"{\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}}", null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"da8d6de63b7745ff8f4457db", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNull();
    }

    @Test
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingSchemaName() {
        HashMap config = new HashMap();
        this.router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", valueSchema, (Object)value);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isSameAs((Object)eventRecord);
    }

    @Test
    public void shouldFailWhenTheSchemaLooksValidButDoesNotHaveTheCorrectFields() {
        HashMap config = new HashMap();
        this.router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat.Envelope").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", valueSchema, (Object)value);
        DataException e = (DataException)Assert.assertThrows(DataException.class, () -> this.router.apply((ConnectRecord)eventRecord));
        Assertions.assertThat((Throwable)e).hasMessage("op is not a valid field name");
    }

    @Test
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingSchemaNameSuffix() {
        HashMap config = new HashMap();
        this.router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", valueSchema, (Object)value);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isSameAs((Object)eventRecord);
    }

    @Test
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingValueSchema() {
        HashMap config = new HashMap();
        this.router.configure(config);
        Schema valueSchema = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct value = new Struct(valueSchema);
        value.put("ts_ms", (Object)Instant.now().toEpochMilli());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", null, (Object)value);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isSameAs((Object)eventRecord);
    }

    @Test
    public void canSkipUpdates() {
        HashMap config = new HashMap();
        this.router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("id", (Schema)SchemaBuilder.string()).build();
        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("id", (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Struct payload = envelope.update((Object)before, before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", envelope.schema(), (Object)payload);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNull();
    }

    @Test(expected=IllegalStateException.class)
    public void canFailOnUpdates() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), EventRouterConfigDefinition.InvalidOperationBehavior.FATAL.getValue());
        this.router.configure(config);
        Schema recordSchema = SchemaBuilder.struct().field("id", (Schema)SchemaBuilder.string()).build();
        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build()).build();
        Struct before = new Struct(recordSchema);
        before.put("id", (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Struct payload = envelope.update((Object)before, before, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, (Object)"772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", envelope.schema(), (Object)payload);
        this.router.apply((ConnectRecord)eventRecord);
    }

    @Test
    public void canExtractTableFields() {
        HashMap config = new HashMap();
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.value()).isEqualTo((Object)"{}");
        Assertions.assertThat((Integer)eventRouted.valueSchema().version()).isNull();
    }

    @Test
    public void canSetDefaultMessageKey() {
        HashMap config = new HashMap();
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Comparable)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)"000000000000000000000001");
    }

    @Test
    public void canSetMessageKey() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "customField");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de63b7745ff8f4457db", "eventType", "payloadId", "payloadType", new Document(), Collections.singletonMap("customField", "dummy"));
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Comparable)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)"dummy");
    }

    @Test(expected=DataException.class)
    public void failsOnInvalidSetMessageKey() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "fakefield");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        this.router.apply((ConnectRecord)eventRecord);
    }

    @Test
    public void canSetTimestampFromDebeziumEnvelopeByDefault() {
        HashMap config = new HashMap();
        this.router.configure(config);
        SourceRecord userEventRecord = this.createEventRecord();
        SourceRecord userEventRouted = (SourceRecord)this.router.apply((ConnectRecord)userEventRecord);
        Struct userEvent = Requirements.requireStruct((Object)userEventRecord.value(), (String)"Test timestamp");
        Long expectedTimestamp = userEvent.getInt64("ts_ms");
        Assertions.assertThat((Long)userEventRecord.timestamp()).isNull();
        Assertions.assertThat((Long)userEventRouted.timestamp()).isEqualTo((Object)expectedTimestamp);
    }

    @Test
    public void canSetTimestampByUserDefinedConfiguration() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "event_timestamp");
        this.router.configure(config);
        Long expectedTimestamp = 14222264625338L;
        HashMap<String, Object> extraValues = new HashMap<String, Object>();
        extraValues.put("event_timestamp", expectedTimestamp);
        SourceRecord userEventRecord = this.createEventRecord("da8d6de63b7745ff8f4457db", "UserCreated", "420b186d", "User", new Document(), extraValues);
        SourceRecord userEventRouted = (SourceRecord)this.router.apply((ConnectRecord)userEventRecord);
        Assertions.assertThat((Long)userEventRecord.timestamp()).isNull();
        Assertions.assertThat((Long)userEventRouted.timestamp()).isEqualTo((Object)expectedTimestamp);
    }

    @Test
    public void canRouteBasedOnField() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "aggregatetype");
        this.router.configure(config);
        SourceRecord userEventRecord = this.createEventRecord();
        SourceRecord userEventRouted = (SourceRecord)this.router.apply((ConnectRecord)userEventRecord);
        Assertions.assertThat((Object)userEventRouted).isNotNull();
        Assertions.assertThat((String)userEventRouted.topic()).isEqualTo((Object)"outbox.event.User");
        SourceRecord userUpdatedEventRecord = this.createEventRecord("da8d6de63b7745ff8f4457db", "UserUpdate", new ObjectId("000000000000000000000001"), "User", new Document());
        SourceRecord userUpdatedEventRouted = (SourceRecord)this.router.apply((ConnectRecord)userUpdatedEventRecord);
        Assertions.assertThat((Object)userUpdatedEventRouted).isNotNull();
        Assertions.assertThat((String)userUpdatedEventRouted.topic()).isEqualTo((Object)"outbox.event.User");
        SourceRecord addressCreatedEventRecord = this.createEventRecord("1a8d6de63b7745ff8f4451db", "AddressCreated", new ObjectId("000000000000000000000001"), "Address", new Document());
        SourceRecord addressCreatedEventRouted = (SourceRecord)this.router.apply((ConnectRecord)addressCreatedEventRecord);
        Assertions.assertThat((Object)addressCreatedEventRouted).isNotNull();
        Assertions.assertThat((String)addressCreatedEventRouted.topic()).isEqualTo((Object)"outbox.event.Address");
    }

    @Test
    public void canConfigureEveryTableField() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "payload_id");
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        config.put(MongoEventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        config.put(MongoEventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "payload_id");
        this.router.configure(config);
        Document outboxEvent = new Document().append("event_id", (Object)new ObjectId("1a8d6de63b7745ff8f4451db")).append("payload_id", (Object)"10711fa5").append("event_type", (Object)"UserCreated").append("payload_body", (Object)new Document());
        String after = outboxEvent.toJson(this.COMPACT_JSON_SETTINGS);
        Schema valueSchema = SchemaBuilder.struct().name("event.Envelope").field("after", Json.builder().optional().build()).field("patch", Json.builder().optional().build()).field("filter", Json.builder().optional().build()).field("updateDescription", MongoDbSchema.UPDATED_DESCRIPTION_SCHEMA).field("op", Schema.OPTIONAL_STRING_SCHEMA).field("ts_ms", Schema.OPTIONAL_INT64_SCHEMA).field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA).build();
        Envelope envelope = Envelope.fromSchema((Schema)valueSchema);
        Struct body = envelope.create((Object)after, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)body);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(1);
        Header header = (Header)headers.iterator().next();
        Assertions.assertThat((String)header.key()).isEqualTo((Object)"id");
        Assertions.assertThat((Object)header.value()).isEqualTo((Object)"1a8d6de63b7745ff8f4451db");
    }

    @Test
    public void canInfluenceDocumentFieldTypes() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "payload_id");
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        config.put(MongoEventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        config.put(MongoEventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "my_route_field");
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "some_boolean:envelope:bool");
        this.router.configure(config);
        Document outboxEvent = new Document().append("event_id", (Object)2).append("payload_id", (Object)1232L).append("event_type", (Object)"CoolSchemaCreated".getBytes()).append("payload_body", (Object)"{}".getBytes()).append("my_route_field", (Object)"routename").append("some_boolean", (Object)true);
        JsonWriterSettings COMPACT_JSON_SETTINGS = JsonWriterSettings.builder().outputMode(JsonMode.STRICT).indent(true).indentCharacters("").newLineCharacters("").build();
        String after = outboxEvent.toJson(COMPACT_JSON_SETTINGS);
        Schema recordSchema = SchemaBuilder.struct().name("event.Envelope").field("after", Json.builder().optional().build()).field("patch", Json.builder().optional().build()).field("filter", Json.builder().optional().build()).field("updateDescription", MongoDbSchema.UPDATED_DESCRIPTION_SCHEMA).field("op", Schema.OPTIONAL_STRING_SCHEMA).field("ts_ms", Schema.OPTIONAL_INT64_SCHEMA).field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA).build();
        Envelope envelope = Envelope.fromSchema((Schema)recordSchema);
        Struct body = envelope.create((Object)after, null, Instant.now());
        SourceRecord eventRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)body);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((String)eventRouted.topic()).isEqualTo((Object)"outbox.event.routename");
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Comparable)valueSchema.field("payload").schema().type()).isEqualTo((Object)SchemaBuilder.bytes().type());
        Assertions.assertThat((Comparable)valueSchema.field("bool").schema().type()).isEqualTo((Object)SchemaBuilder.bool().type());
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("payload")).isEqualTo((Object)"{}".getBytes());
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)1232L);
        Headers headers = eventRouted.headers();
        Assertions.assertThat((int)headers.size()).isEqualTo(1);
        Header header = (Header)headers.iterator().next();
        Assertions.assertThat((String)header.key()).isEqualTo((Object)"id");
        Assertions.assertThat((Object)header.value()).isEqualTo((Object)2);
    }

    @Test
    public void canSetSchemaVersionWhenMoreThanPayloadIsInEnvelope() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:eventType");
        this.router.configure(config);
        HashMap<String, Object> extraValuesV1 = new HashMap<String, Object>();
        extraValuesV1.put("version", 1);
        SourceRecord eventRecordV1 = this.createEventRecord("000000000000000000000000", "UserCreated", "420b186d", "User", new Document(), extraValuesV1);
        SourceRecord eventRoutedV1 = (SourceRecord)this.router.apply((ConnectRecord)eventRecordV1);
        Assertions.assertThat((Integer)eventRoutedV1.valueSchema().version()).isEqualTo(1);
        HashMap<String, Object> extraValuesV3 = new HashMap<String, Object>();
        extraValuesV3.put("version", 3);
        SourceRecord eventRecordV3 = this.createEventRecord("000000000000000000000001", "UserCreated", "420b186d", "User", new Document(), extraValuesV3);
        SourceRecord eventRoutedV3 = (SourceRecord)this.router.apply((ConnectRecord)eventRecordV3);
        Assertions.assertThat((Integer)eventRoutedV3.valueSchema().version()).isEqualTo(3);
        SourceRecord eventRecordV1E2 = this.createEventRecord("000000000000000000000002", "UserCreated", "1b10b70b", "User", new Document(), extraValuesV1);
        SourceRecord eventRoutedV1E2 = (SourceRecord)this.router.apply((ConnectRecord)eventRecordV1E2);
        Assertions.assertThat((Integer)eventRoutedV1E2.valueSchema().version()).isEqualTo(1);
        Assertions.assertThat((Object)eventRoutedV1.valueSchema()).isSameAs((Object)eventRoutedV1E2.valueSchema());
    }

    @Test
    public void shouldNotSetSchemaVersionByDefault() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        this.router.configure(config);
        HashMap<String, Object> extraValuesV1 = new HashMap<String, Object>();
        extraValuesV1.put("version", 1);
        SourceRecord eventRecordV1 = this.createEventRecord("000000000000000000000000", "UserCreated", "420b186d", "User", new Document(), extraValuesV1);
        SourceRecord eventRoutedV1 = (SourceRecord)this.router.apply((ConnectRecord)eventRecordV1);
        Assertions.assertThat((Integer)eventRoutedV1.valueSchema().version()).isNull();
        HashMap<String, Object> extraValuesV3 = new HashMap<String, Object>();
        extraValuesV3.put("version", 3);
        SourceRecord eventRecordV3 = this.createEventRecord("000000000000000000000001", "UserCreated", "420b186d", "User", new Document(), extraValuesV3);
        SourceRecord eventRoutedV3 = (SourceRecord)this.router.apply((ConnectRecord)eventRecordV3);
        Assertions.assertThat((Integer)eventRoutedV3.valueSchema().version()).isNull();
        SourceRecord eventRecordV1E2 = this.createEventRecord("000000000000000000000002", "UserCreated", "1b10b70b", "User", new Document(), extraValuesV1);
        SourceRecord eventRoutedV1E2 = (SourceRecord)this.router.apply((ConnectRecord)eventRecordV1E2);
        Assertions.assertThat((Integer)eventRoutedV1E2.valueSchema().version()).isNull();
    }

    @Test
    public void canSetPayloadTypeIntoTheEnvelope() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("type")).isEqualTo((Object)"UserCreated");
    }

    @Test
    public void canSetPayloadTypeIntoTheEnvelopeWithAlias() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:aggregateType");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)((Struct)eventRouted.value()).get("aggregateType")).isEqualTo((Object)"UserCreated");
    }

    @Test
    public void canSetMultipleFieldsIntoTheEnvelope() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:payloadType,aggregateid:envelope:payloadId,type:header:payloadType");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord();
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Struct value = (Struct)eventRouted.value();
        Assertions.assertThat((Object)value.get("payloadType")).isEqualTo((Object)"UserCreated");
        Assertions.assertThat((Object)value.get("payloadId")).isEqualTo((Object)"000000000000000000000001");
        Assertions.assertThat((Object)eventRouted.headers().lastWithName("payloadType").value()).isEqualTo((Object)"UserCreated");
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForTopicRegex() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name(), " [[a-z]");
        this.router.configure(config);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForAdditionalFields() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type");
        this.router.configure(config);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForAdditionalFieldsEmpty() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "");
        this.router.configure(config);
    }

    @Test(expected=ConfigException.class)
    public void shouldFailOnInvalidConfigurationForOperationBehavior() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), "invalidOption");
        this.router.configure(config);
    }

    @Test
    public void canSetBinaryMessageKey() {
        byte[] eventType = "a UserCreated".getBytes(StandardCharsets.UTF_8);
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "type");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de63b7745ff8f4457db", eventType, "Some other payload id", "User", new Document(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Comparable)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.BYTES);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)eventType);
    }

    @Test
    public void canPassBinaryKey() {
        byte[] key = "a binary key".getBytes(StandardCharsets.UTF_8);
        this.canPassKeyByType(SchemaBuilder.bytes(), key);
    }

    @Test
    public void canPassIntKey() {
        int key = 54321;
        this.canPassKeyByType(SchemaBuilder.int32(), 54321);
    }

    private void canPassKeyByType(SchemaBuilder keyType, Object key) {
        HashMap config = new HashMap();
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de63b7745ff8f4457db", "UserCreated", key, "User", new Document(), new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Comparable)eventRouted.keySchema().type()).isEqualTo((Object)keyType.type());
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo(key);
    }

    @Test
    public void canPassBinaryMessage() {
        byte[] value = "a binary message".getBytes(StandardCharsets.UTF_8);
        String key = "a key";
        HashMap config = new HashMap();
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("da8d6de63b7745ff8f4457db", "UserCreated", "a key", "User", value, new HashMap<String, Object>());
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Comparable)eventRouted.keySchema().type()).isEqualTo((Object)Schema.Type.STRING);
        Assertions.assertThat((Object)eventRouted.key()).isEqualTo((Object)"a key");
        Assertions.assertThat((Comparable)eventRouted.valueSchema().type()).isEqualTo((Object)Schema.Type.BYTES);
        Assertions.assertThat((Object)eventRouted.value()).isEqualTo((Object)value);
    }

    @Test
    public void canMarkAnEventAsDeleted() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "is_deleted:envelope:deleted");
        config.put(MongoEventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
        this.router.configure(config);
        HashMap<String, Object> extraValues = new HashMap<String, Object>();
        extraValues.put("is_deleted", true);
        SourceRecord eventRecord = this.createEventRecord("000000000000000000000000", "UserCreated", "10711fa5", "User", new Document(), extraValues);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Struct value = (Struct)eventRouted.value();
        Assertions.assertThat((Object)value).isNotNull();
        Assertions.assertThat((Object)value.get("deleted")).isEqualTo((Object)true);
        SourceRecord eventRecordTombstone = this.createEventRecord("000000000000000000000000", "UserCreated", "10711fa5", "User", null, extraValues);
        SourceRecord eventRoutedTombstone = (SourceRecord)this.router.apply((ConnectRecord)eventRecordTombstone);
        Struct tombstone = (Struct)eventRoutedTombstone.value();
        Assertions.assertThat((Object)tombstone).isNull();
        VerifyRecord.isValidTombstone((SourceRecord)eventRoutedTombstone);
    }

    @Test
    public void noTombstoneIfNotConfigured() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "is_deleted:envelope:deleted");
        this.router.configure(config);
        HashMap<String, Object> extraValues = new HashMap<String, Object>();
        extraValues.put("is_deleted", true);
        SourceRecord eventRecord = this.createEventRecord("000000000000000000000000", "UserCreated", "10711fa5", "User", new Document(), extraValues);
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Struct value = (Struct)eventRouted.value();
        Assertions.assertThat((Object)value).isNotNull();
        Assertions.assertThat((Object)value.get("deleted")).isEqualTo((Object)true);
        SourceRecord eventRecordTombstone = this.createEventRecord("000000000000000000000001", "UserCreated", "10711fa5", "User", null, extraValues);
        SourceRecord eventRoutedTombstone = (SourceRecord)this.router.apply((ConnectRecord)eventRecordTombstone);
        Struct tombstone = (Struct)eventRoutedTombstone.value();
        Assertions.assertThat((Object)eventRoutedTombstone.key()).isNotNull();
        Assertions.assertThat((Object)eventRoutedTombstone.keySchema()).isNotNull();
        Assertions.assertThat((Object)tombstone).isNotNull();
        Assertions.assertThat((Object)tombstone.get("deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)eventRoutedTombstone.valueSchema()).isNotNull();
    }

    @Test
    public void canExpandJsonPayloadIfConfigured() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(MongoEventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), "true");
        this.router.configure(config);
        SourceRecord eventRecord = this.createEventRecord("000000000000000000000000", "UserCreated", new ObjectId("000000000000000000000001"), "User", new Document().append("fullName", (Object)"John Doe").append("enabled", (Object)true).append("rating", (Object)4.9).append("age", (Object)42L).append("pets", Arrays.asList("dog", "cat")));
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Schema valueSchema = eventRouted.valueSchema();
        Assertions.assertThat((Comparable)valueSchema.type()).isEqualTo((Object)SchemaBuilder.struct().type());
        Assertions.assertThat((int)valueSchema.fields().size()).isEqualTo(5);
        Assertions.assertThat((String)valueSchema.field("fullName").schema().type().getName()).isEqualTo((Object)"string");
        Assertions.assertThat((String)valueSchema.field("enabled").schema().type().getName()).isEqualTo((Object)"boolean");
        Assertions.assertThat((String)valueSchema.field("rating").schema().type().getName()).isEqualTo((Object)"float64");
        Assertions.assertThat((String)valueSchema.field("age").schema().type().getName()).isEqualTo((Object)"int64");
        Assertions.assertThat((String)valueSchema.field("pets").schema().type().getName()).isEqualTo((Object)"array");
        Struct valueStruct = (Struct)eventRouted.value();
        Assertions.assertThat((Object)valueStruct.get("fullName")).isEqualTo((Object)"John Doe");
        Assertions.assertThat((Object)valueStruct.get("enabled")).isEqualTo((Object)true);
        Assertions.assertThat((Object)valueStruct.get("rating")).isEqualTo((Object)4.9);
        Assertions.assertThat((Object)valueStruct.get("age")).isEqualTo((Object)42L);
        Assertions.assertThat((int)valueStruct.getArray("pets").size()).isEqualTo(2);
        Assertions.assertThat(valueStruct.getArray("pets").get(1)).isEqualTo((Object)"cat");
    }

    @Test
    public void shouldNotExpandJSONPayloadIfNotConfigured() {
        this.router.configure(new HashMap());
        SourceRecord eventRecord = this.createEventRecord("000000000000000000000000", "UserCreated", new ObjectId("000000000000000000000001"), "User", new Document().append("fullName", (Object)"John Doe").append("rating", (Object)4.9).append("age", (Object)42));
        SourceRecord eventRouted = (SourceRecord)this.router.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRouted).isNotNull();
        Assertions.assertThat((Object)eventRouted.valueSchema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Document payload = Document.parse((String)((String)eventRouted.value()));
        Assertions.assertThat((Object)payload.get((Object)"fullName")).isEqualTo((Object)"John Doe");
        Assertions.assertThat((Object)payload.get((Object)"rating")).isEqualTo((Object)4.9);
        Assertions.assertThat((Object)payload.get((Object)"age")).isEqualTo((Object)42);
    }

    private SourceRecord createEventRecord() {
        return this.createEventRecord("da8d6de63b7745ff8f4457db", "UserCreated", new ObjectId("000000000000000000000001"), "User", new Document());
    }

    private SourceRecord createEventRecord(String eventId, String eventType, ObjectId payloadId, String payloadType, Object payload) {
        return this.createEventRecord(eventId, eventType, payloadId, payloadType, payload, new HashMap<String, Object>());
    }

    private SourceRecord createEventRecord(String eventId, Object eventType, Object payloadId, Object payloadType, Object payload, Map<String, Object> extraValues) {
        Document outboxEvent = new Document().append("_id", (Object)new ObjectId(eventId)).append("aggregatetype", payloadType).append("aggregateid", payloadId).append("type", eventType).append("payload", payload);
        extraValues.forEach((arg_0, arg_1) -> ((Document)outboxEvent).append(arg_0, arg_1));
        String after = outboxEvent.toJson(this.COMPACT_JSON_SETTINGS);
        Schema valueSchema = SchemaBuilder.struct().name("event.Envelope").field("after", Json.builder().optional().build()).field("patch", Json.builder().optional().build()).field("filter", Json.builder().optional().build()).field("updateDescription", MongoDbSchema.UPDATED_DESCRIPTION_SCHEMA).field("op", Schema.OPTIONAL_STRING_SCHEMA).field("ts_ms", Schema.OPTIONAL_INT64_SCHEMA).field("transaction", TransactionMonitor.TRANSACTION_BLOCK_SCHEMA).build();
        Envelope envelope = Envelope.fromSchema((Schema)valueSchema);
        Struct body = envelope.create((Object)after, null, Instant.now());
        return new SourceRecord(new HashMap(), new HashMap(), "db.outbox", envelope.schema(), (Object)body);
    }
}

