/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.transforms;

import io.debezium.connector.mongodb.transforms.AbstractExtractNewDocumentStateTestIT;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaUtil;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Collect;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.fest.assertions.Assertions;
import org.junit.Test;

public class ExtractNewDocumentStateTestIT
extends AbstractExtractNewDocumentStateTestIT {
    private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
    private static final String HANDLE_DELETES = "delete.handling.mode";
    private static final String FLATTEN_STRUCT = "flatten.struct";
    private static final String DELIMITER = "flatten.struct.delimiter";
    private static final String OPERATION_HEADER = "operation.header";
    private static final String DROP_TOMBSTONE = "drop.tombstones";
    private static final String ADD_SOURCE_FIELDS = "add.source.fields";
    private static final String ADD_HEADERS = "add.headers";
    private static final String ADD_FIELDS = "add.fields";

    @Override
    protected String getCollectionName() {
        return "functional";
    }

    @Test
    @FixFor(value={"DBZ-563"})
    public void shouldDropTombstoneByDefault() throws InterruptedException {
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}")));
        SourceRecord firstRecord = this.getRecordByOperation(Envelope.Operation.DELETE);
        SourceRecord transformedDelete = (SourceRecord)this.transformation.apply((ConnectRecord)firstRecord);
        Assertions.assertThat((Object)transformedDelete).isNull();
        SourceRecord tombstoneRecord = this.getNextRecord();
        Assertions.assertThat((Object)tombstoneRecord).isNotNull();
        SourceRecord transformedTombstone = (SourceRecord)this.transformation.apply((ConnectRecord)tombstoneRecord);
        Assertions.assertThat((Object)transformedTombstone).isNull();
    }

    @Test
    public void shouldTransformEvents() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put("drop.tombstones", "false");
        transformationConfig.put(HANDLE_DELETES, "none");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> {
            long timestamp = ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")).toEpochSecond();
            client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)("{  '_id': 1,   'dataStr': 'hello',   'dataInt': 123,   'dataLong': 80000000000,   'dataDate': ISODate(\"2020-01-27T10:47:12.311Z\"),   'dataTimestamp': Timestamp(" + timestamp + ", 1)}")));
        });
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Struct transformedInsertValue = (Struct)transformedInsert.value();
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataInt").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataLong").schema()).isEqualTo((Object)Schema.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat((Object)transformedInsertValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedInsertValue.get("dataStr")).isEqualTo((Object)"hello");
        Assertions.assertThat((Object)transformedInsertValue.get("dataInt")).isEqualTo((Object)123);
        Assertions.assertThat((Object)transformedInsertValue.get("dataLong")).isEqualTo((Object)80000000000L);
        Assertions.assertThat((Object)transformedInsertValue.get("dataDate")).isEqualTo((Object)Date.from(Instant.from(ZonedDateTime.of(2020, 1, 27, 10, 47, 12, 311000000, ZoneId.of("UTC")))));
        Assertions.assertThat((Object)transformedInsertValue.get("dataTimestamp")).isEqualTo((Object)Date.from(Instant.from(ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")))));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'dataStr': 'bye'}}")));
        records = this.consumeRecordsByTopic(1);
        SourceRecord candidateUpdateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        if (((Struct)candidateUpdateRecord.value()).get("op").equals("c")) {
            records = this.consumeRecordsByTopic(1);
        }
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord updateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateRecord);
        Struct transformedUpdateValue = (Struct)transformedUpdate.value();
        Assertions.assertThat((Object)transformedUpdate.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedUpdate.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedUpdateValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedUpdateValue.get("dataStr")).isEqualTo((Object)"bye");
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'newStr': 'hello', 'dataInt': 456}}")));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord updateMultipleRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedMultipleUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateMultipleRecord);
        Struct transformedMultipleUpdateValue = (Struct)transformedMultipleUpdate.value();
        Assertions.assertThat((Object)transformedMultipleUpdate.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedMultipleUpdate.valueSchema().field("newStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedMultipleUpdate.valueSchema().field("dataInt").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedMultipleUpdateValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedMultipleUpdateValue.get("newStr")).isEqualTo((Object)"hello");
        Assertions.assertThat((Object)transformedMultipleUpdateValue.get("dataInt")).isEqualTo((Object)456);
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$unset': {'newStr': ''}}")));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord updateUnsetRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedUnsetUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateUnsetRecord);
        Struct transformedUnsetUpdateValue = (Struct)transformedUnsetUpdate.value();
        Assertions.assertThat((Object)transformedUnsetUpdate.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedUnsetUpdate.valueSchema().field("newStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedUnsetUpdateValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedUnsetUpdateValue.get("newStr")).isEqualTo(null);
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'dataStr': 'Hi again'}")));
        records = this.consumeRecordsByTopic(1);
        SourceRecord candidateFullUpdateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        if (((Struct)candidateFullUpdateRecord.value()).get("op").equals("c")) {
            records = this.consumeRecordsByTopic(1);
        }
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord FullUpdateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedFullUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)FullUpdateRecord);
        Struct transformedFullUpdateValue = (Struct)transformedFullUpdate.value();
        Assertions.assertThat((Object)transformedFullUpdate.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedFullUpdate.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedFullUpdateValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedFullUpdateValue.get("dataStr")).isEqualTo((Object)"Hi again");
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}")));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        SourceRecord deleteRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedDelete = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Struct transformedDeleteValue = (Struct)transformedDelete.value();
        Assertions.assertThat((Object)transformedDeleteValue).isNull();
        SourceRecord tombstoneRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(1);
        SourceRecord transformedTombstone = (SourceRecord)this.transformation.apply((ConnectRecord)tombstoneRecord);
        Assertions.assertThat((Object)transformedTombstone.value()).isNull();
        Assertions.assertThat((String)SchemaUtil.asString((Schema)transformedDelete.keySchema())).isEqualTo((Object)SchemaUtil.asString((Schema)transformedTombstone.keySchema()));
        Assertions.assertThat((String)transformedDelete.key().toString()).isEqualTo((Object)transformedTombstone.key().toString());
    }

    @Test
    @FixFor(value={"DBZ-1767"})
    public void shouldSupportDbRef() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put("array.encoding", "array");
        transformationConfig.put(OPERATION_HEADER, "true");
        transformationConfig.put("sanitize.field.names", "true");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        this.validate(transformed);
        Struct value = ((Struct)transformed.value()).getStruct("data");
        Assertions.assertThat((String)value.getString("_ref")).isEqualTo((Object)"a2");
        Assertions.assertThat((Integer)value.getInt32("_id")).isEqualTo(4);
        Assertions.assertThat((String)value.getString("_db")).isEqualTo((Object)"b2");
    }

    @Test
    @FixFor(value={"DBZ-1442"})
    public void shouldAddSourceFields() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord , db,rs");
        this.transformation.configure(props);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{ '_id' : 3, 'name' : 'Tim' }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 3}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'name': 'Sally'}}")));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct source = ((Struct)record.value()).getStruct("source");
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.get("__h")).isEqualTo((Object)source.getInt64("h"));
        Assertions.assertThat((Object)value.get("__ts_ms")).isEqualTo((Object)source.getInt64("ts_ms"));
        Assertions.assertThat((Object)value.get("__ord")).isEqualTo((Object)source.getInt32("ord"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)source.getString("db"));
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)source.getString("rs"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)"transform_operations");
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)"rs0");
    }

    @Test
    @FixFor(value={"DBZ-1442"})
    public void shouldAddSourceFieldsForRewriteDeleteEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord,db,rs");
        props.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(props);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{ '_id' : 4, 'name' : 'Sally' }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{ '_id' : 4 }")));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct source = ((Struct)record.value()).getStruct("source");
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.get("__h")).isEqualTo((Object)source.getInt64("h"));
        Assertions.assertThat((Object)value.get("__ts_ms")).isEqualTo((Object)source.getInt64("ts_ms"));
        Assertions.assertThat((Object)value.get("__ord")).isEqualTo((Object)source.getInt32("ord"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)source.getString("db"));
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)source.getString("rs"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)"transform_operations");
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)"rs0");
    }

    @Test
    public void shouldTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("phone", (Object)123L).append("active", (Object)true).append("scores", Arrays.asList(1.2, 3.4, 5.6));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((String)value.schema().name()).isEqualTo((Object)("mongo.transform_operations." + this.getCollectionName()));
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("phone")).isEqualTo((Object)123L);
        Assertions.assertThat((Object)value.get("active")).isEqualTo((Object)true);
        Assertions.assertThat((Object)value.get("scores")).isEqualTo(Arrays.asList(1.2, 3.4, 5.6));
        Assertions.assertThat((Object)value.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("phone").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("active").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("scores").schema()).isEqualTo((Object)SchemaBuilder.array((Schema)SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA).optional().build());
        Assertions.assertThat((List)value.schema().fields()).hasSize(5);
    }

    @Test
    public void shouldTransformRecordForInsertEventWithComplexIdType() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        Document obj = new Document().append("_id", (Object)new Document().append("company", (Object)32).append("dept", (Object)"home improvement")).append("name", (Object)"Sally");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema().field("company").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)key.schema().field("id").schema().field("dept").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)((Struct)key.get("id")).get("company")).isEqualTo((Object)32);
        Assertions.assertThat((Object)((Struct)key.get("id")).get("dept")).isEqualTo((Object)"home improvement");
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)((Struct)value.get("id")).get("company")).isEqualTo((Object)32);
        Assertions.assertThat((Object)((Struct)value.get("id")).get("dept")).isEqualTo((Object)"home improvement");
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.schema().field("id").schema().field("company").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("id").schema().field("dept").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(2);
    }

    @Test
    public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally"));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.UPDATE.code());
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(2);
    }

    @Test
    @FixFor(value={"DBZ-612"})
    public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim").append("phone", (Object)123L).append("active", (Object)false);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally")).append("$unset", (Object)new Document().append("phone", (Object)true).append("active", (Object)false));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("phone")).isEqualTo(null);
        Assertions.assertThat((Object)value.schema().field("phone").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(4);
    }

    @Test
    @FixFor(value={"DBZ-612"})
    public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("phone", (Object)123L).append("active", (Object)false);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$unset", (Object)new Document().append("phone", (Object)true).append("active", (Object)false));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("phone")).isEqualTo(null);
        Assertions.assertThat((Object)value.schema().field("phone").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(3);
    }

    @Test
    @FixFor(value={"DBZ-582"})
    public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {
        this.restartConnectorWithoutEmittingTombstones();
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "none");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1032"})
    public void shouldGenerateRecordHeaderForTombstone() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        props.put("drop.tombstones", "false");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(1));
        this.validate(transformed);
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)value).isNull();
    }

    @Test
    @FixFor(value={"DBZ-583"})
    public void shouldDropDeleteMessagesByDefault() throws InterruptedException {
        this.restartConnectorWithoutEmittingTombstones();
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Assertions.assertThat((Object)transformed).isNull();
    }

    @Test
    @FixFor(value={"DBZ-583"})
    public void shouldRewriteDeleteMessage() throws InterruptedException {
        this.restartConnectorWithoutEmittingTombstones();
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema().field("__deleted").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)value.get("__deleted")).isEqualTo((Object)true);
    }

    @Test
    @FixFor(value={"DBZ-583"})
    public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally"));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.schema().field("__deleted").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)value.get("__deleted")).isEqualTo((Object)false);
    }

    @Test
    public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        props.put(HANDLE_DELETES, "none");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value).isNull();
    }

    @Test
    @FixFor(value={"DBZ-971"})
    public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally"));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        record.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(1);
        Iterator headers = transformed.headers().allWithName("application/debezium-test-header");
        Assertions.assertThat((boolean)headers.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)headers.next()).value().toString()).isEqualTo((Object)"shouldPropagatePreviousRecordHeaders");
    }

    @Test
    public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address")).isEqualTo((Object)new Struct(value.schema().field("address").schema()).put("street", (Object)"Morris Park Ave").put("zipcode", (Object)"10462"));
        Assertions.assertThat((Object)value.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address").schema()).isEqualTo((Object)SchemaBuilder.struct().name("mongo.transform_operations." + this.getCollectionName() + ".address").optional().field("street", Schema.OPTIONAL_STRING_SCHEMA).field("zipcode", Schema.OPTIONAL_STRING_SCHEMA).build());
        Assertions.assertThat((List)value.schema().fields()).hasSize(3);
    }

    @Test
    public void shouldFlattenTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FLATTEN_STRUCT, "true");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address_street")).isEqualTo((Object)"Morris Park Ave");
        Assertions.assertThat((Object)value.get("address_zipcode")).isEqualTo((Object)"10462");
        Assertions.assertThat((Object)value.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address_street").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address_zipcode").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(DELIMITER, "-");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address-street")).isEqualTo((Object)"Morris Park Ave");
        Assertions.assertThat((Object)value.get("address-zipcode")).isEqualTo((Object)"10462");
        Assertions.assertThat((Object)value.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-street").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-zipcode").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(DELIMITER, "-");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document(Collect.hashMapOf((Object)"address.city", (Object)"Canberra", (Object)"address.name", (Object)"James", (Object)"address.city2.part", (Object)3)));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)records.allRecordsInOrder().get(0));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address-city")).isEqualTo((Object)"Canberra");
        Assertions.assertThat((Object)value.get("address-name")).isEqualTo((Object)"James");
        Assertions.assertThat((Object)value.get("address-city2-part")).isEqualTo((Object)3);
        Assertions.assertThat((Object)value.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-city").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-city2-part").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(4);
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddHeader() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_HEADERS, "op");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(1);
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddHeadersForMissingOrInvalidFields() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_HEADERS, "op,id");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(2);
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__id")).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddHeadersSpecifyingStruct() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_HEADERS, "op,source.rs,source.collection");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(3);
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__source_rs")).isEqualTo((Object)"rs0");
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__source_collection")).isEqualTo((Object)this.getCollectionName());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddField() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFields() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op , ts_ms");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__ts_ms")).isNotNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsForMissingOptionalField() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op,id");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__id")).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsSpecifyStruct() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op,source.rs,source.collection");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_rs")).isEqualTo((Object)"rs0");
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_collection")).isEqualTo((Object)this.getCollectionName());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldHandleDeleteRewrite() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op");
        this.transformation.configure(props);
        SourceRecord deleteRecord = (SourceRecord)this.createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void tesAddFieldsHandleDeleteRewrite() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,ts_ms");
        this.transformation.configure(props);
        SourceRecord deleteRecord = (SourceRecord)this.createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__ts_ms")).isNotNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsSpecifyStructHandleDeleteRewrite() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,source.rs,source.collection");
        this.transformation.configure(props);
        SourceRecord deleteRecord = (SourceRecord)this.createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_rs")).isEqualTo((Object)"rs0");
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_collection")).isEqualTo((Object)this.getCollectionName());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsHandleDeleteRewriteAndTombstone() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,ts_ms");
        props.put("drop.tombstones", "false");
        this.transformation.configure(props);
        AbstractConnectorTest.SourceRecords records = this.createDeleteRecordWithTombstone();
        SourceRecord deleteRecord = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord deleteTransformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)deleteTransformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)deleteTransformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)((Struct)deleteTransformed.value()).get("__ts_ms")).isNotNull();
        SourceRecord tombstoneRecord = (SourceRecord)records.allRecordsInOrder().get(1);
        SourceRecord tombstoneTransformed = (SourceRecord)this.transformation.apply((ConnectRecord)tombstoneRecord);
        Assertions.assertThat((Object)tombstoneTransformed.value()).isNull();
    }

    private SourceRecord createCreateRecord() throws Exception {
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("struct", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        return (SourceRecord)records.allRecordsInOrder().get(0);
    }

    private AbstractConnectorTest.SourceRecords createDeleteRecordWithTombstone() throws Exception {
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("struct", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords createRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)createRecords.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> {
            Document filter = Document.parse((String)("{\"_id\": {\"$oid\": \"" + objId + "\"}}"));
            client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)filter);
        });
        AbstractConnectorTest.SourceRecords deleteRecords = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)deleteRecords.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        return deleteRecords;
    }

    private static void waitForStreamingRunning() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
    }

    private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) {
        Iterator headers = record.headers().allWithName(headerKey);
        if (!headers.hasNext()) {
            return null;
        }
        Object value = ((Header)headers.next()).value();
        return value != null ? value.toString() : null;
    }
}

