/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.transforms;

import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.transforms.AbstractExtractNewDocumentStateTestIT;
import io.debezium.data.Envelope;
import io.debezium.data.Json;
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Collect;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.fest.assertions.Assertions;
import org.junit.Test;

public class ExtractNewDocumentStateTestIT
extends AbstractExtractNewDocumentStateTestIT {
    private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
    private static final String HANDLE_DELETES = "delete.handling.mode";
    private static final String FLATTEN_STRUCT = "flatten.struct";
    private static final String DELIMITER = "flatten.struct.delimiter";
    private static final String OPERATION_HEADER = "operation.header";
    private static final String DROP_TOMBSTONE = "drop.tombstones";
    private static final String ADD_SOURCE_FIELDS = "add.source.fields";
    private static final String ADD_HEADERS = "add.headers";
    private static final String ADD_FIELDS = "add.fields";
    private static final String ADD_FIELDS_PREFIX = "add.fields.prefix";
    private static final String ADD_HEADERS_PREFIX = "add.headers.prefix";
    private static final String ARRAY_ENCODING = "array.encoding";

    @Override
    protected String getCollectionName() {
        return "functional";
    }

    @Test
    @FixFor(value={"DBZ-563"})
    public void shouldDropTombstoneByDefault() throws InterruptedException {
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}")));
        SourceRecord firstRecord = this.getRecordByOperation(Envelope.Operation.DELETE);
        SourceRecord transformedDelete = (SourceRecord)this.transformation.apply((ConnectRecord)firstRecord);
        Assertions.assertThat((Object)transformedDelete).isNull();
        SourceRecord tombstoneRecord = this.getNextRecord();
        Assertions.assertThat((Object)tombstoneRecord).isNotNull();
        SourceRecord transformedTombstone = (SourceRecord)this.transformation.apply((ConnectRecord)tombstoneRecord);
        Assertions.assertThat((Object)transformedTombstone).isNull();
    }

    @Test
    public void shouldTransformEvents() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put("drop.tombstones", "false");
        transformationConfig.put(HANDLE_DELETES, "none");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> {
            long timestamp = ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")).toEpochSecond();
            client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)("{  '_id': 1,   'dataStr': 'hello',   'dataInt': 123,   'dataLong': 80000000000,   'dataDate': ISODate(\"2020-01-27T10:47:12.311Z\"),   'dataTimestamp': Timestamp(" + timestamp + ", 1)}")));
        });
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Struct transformedInsertValue = (Struct)transformedInsert.value();
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("_id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataInt").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataLong").schema()).isEqualTo((Object)Schema.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat((Object)transformedInsertValue.get("_id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedInsertValue.get("dataStr")).isEqualTo((Object)"hello");
        Assertions.assertThat((Object)transformedInsertValue.get("dataInt")).isEqualTo((Object)123);
        Assertions.assertThat((Object)transformedInsertValue.get("dataLong")).isEqualTo((Object)80000000000L);
        Assertions.assertThat((Object)transformedInsertValue.get("dataDate")).isEqualTo((Object)Date.from(Instant.from(ZonedDateTime.of(2020, 1, 27, 10, 47, 12, 311000000, ZoneId.of("UTC")))));
        Assertions.assertThat((Object)transformedInsertValue.get("dataTimestamp")).isEqualTo((Object)Date.from(Instant.from(ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")))));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'dataStr': 'bye'}}")));
        records = this.consumeRecordsByTopic(1);
        SourceRecord candidateUpdateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        if (((Struct)candidateUpdateRecord.value()).get("op").equals("c")) {
            records = this.consumeRecordsByTopic(1);
        }
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord updateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateRecord);
        Struct transformedUpdateValue = (Struct)transformedUpdate.value();
        Assertions.assertThat((Object)transformedUpdate.valueSchema().field("_id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedUpdate.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedUpdateValue.get("_id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedUpdateValue.get("dataStr")).isEqualTo((Object)"bye");
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'newStr': 'hello', 'dataInt': 456}}")));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord updateMultipleRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedMultipleUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateMultipleRecord);
        Struct transformedMultipleUpdateValue = (Struct)transformedMultipleUpdate.value();
        Assertions.assertThat((Object)transformedMultipleUpdate.valueSchema().field("_id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedMultipleUpdate.valueSchema().field("newStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedMultipleUpdate.valueSchema().field("dataInt").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedMultipleUpdateValue.get("_id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedMultipleUpdateValue.get("newStr")).isEqualTo((Object)"hello");
        Assertions.assertThat((Object)transformedMultipleUpdateValue.get("dataInt")).isEqualTo((Object)456);
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$unset': {'newStr': ''}}")));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord updateUnsetRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedUnsetUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateUnsetRecord);
        Struct transformedUnsetUpdateValue = (Struct)transformedUnsetUpdate.value();
        Assertions.assertThat((Object)transformedUnsetUpdate.valueSchema().field("_id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedUnsetUpdateValue.get("_id")).isEqualTo((Object)1);
        if (TestHelper.isOplogCaptureMode()) {
            Assertions.assertThat((Object)transformedUnsetUpdate.valueSchema().field("newStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        } else {
            Assertions.assertThat((Object)transformedUnsetUpdateValue.schema().field("newStr")).isNull();
        }
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'dataStr': 'Hi again'}")));
        records = this.consumeRecordsByTopic(1);
        SourceRecord candidateFullUpdateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        if (((Struct)candidateFullUpdateRecord.value()).get("op").equals("c")) {
            records = this.consumeRecordsByTopic(1);
        }
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord FullUpdateRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedFullUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)FullUpdateRecord);
        Struct transformedFullUpdateValue = (Struct)transformedFullUpdate.value();
        Assertions.assertThat((Object)transformedFullUpdate.valueSchema().field("_id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedFullUpdate.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedFullUpdateValue.get("_id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedFullUpdateValue.get("dataStr")).isEqualTo((Object)"Hi again");
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}")));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        SourceRecord deleteRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedDelete = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Struct transformedDeleteValue = (Struct)transformedDelete.value();
        Assertions.assertThat((Object)transformedDeleteValue).isNull();
        SourceRecord tombstoneRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(1);
        SourceRecord transformedTombstone = (SourceRecord)this.transformation.apply((ConnectRecord)tombstoneRecord);
        Assertions.assertThat((Object)transformedTombstone.value()).isNull();
        Assertions.assertThat((String)SchemaUtil.asString((Schema)transformedDelete.keySchema())).isEqualTo((Object)SchemaUtil.asString((Schema)transformedTombstone.keySchema()));
        Assertions.assertThat((String)transformedDelete.key().toString()).isEqualTo((Object)transformedTombstone.key().toString());
    }

    @Test
    @FixFor(value={"DBZ-1767"})
    public void shouldSupportDbRef() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put(ARRAY_ENCODING, "array");
        transformationConfig.put(OPERATION_HEADER, "true");
        transformationConfig.put("sanitize.field.names", "true");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        this.validate(transformed);
        Struct value = ((Struct)transformed.value()).getStruct("data");
        Assertions.assertThat((String)value.getString("_ref")).isEqualTo((Object)"a2");
        Assertions.assertThat((Integer)value.getInt32("_id")).isEqualTo(4);
        Assertions.assertThat((String)value.getString("_db")).isEqualTo((Object)"b2");
    }

    @Test
    @FixFor(value={"DBZ-2680"})
    public void shouldSupportSubSanitizeFieldName() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put(ARRAY_ENCODING, "array");
        transformationConfig.put(OPERATION_HEADER, "true");
        transformationConfig.put("sanitize.field.names", "true");
        this.transformation.configure(transformationConfig);
        String doc = "{  \"_id\": \"222\",  \"metrics\": {    \"metric::fct\": {      \"min\": 0,      \"max\": 1,    },  }}";
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{  \"_id\": \"222\",  \"metrics\": {    \"metric::fct\": {      \"min\": 0,      \"max\": 1,    },  }}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        this.validate(transformed);
        Struct metric = ((Struct)transformed.value()).getStruct("metrics").getStruct("metric__fct");
        Assertions.assertThat((Integer)metric.getInt32("min")).isEqualTo(0);
        Assertions.assertThat((Integer)metric.getInt32("max")).isEqualTo(1);
    }

    @Test
    @FixFor(value={"DBZ-1442"})
    public void shouldAddSourceFields() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord , db,rs");
        this.transformation.configure(props);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{ '_id' : 3, 'name' : 'Tim' }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 3}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'name': 'Sally'}}")));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct source = ((Struct)record.value()).getStruct("source");
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.get("__h")).isEqualTo((Object)source.getInt64("h"));
        Assertions.assertThat((Object)value.get("__ts_ms")).isEqualTo((Object)source.getInt64("ts_ms"));
        Assertions.assertThat((Object)value.get("__ord")).isEqualTo((Object)source.getInt32("ord"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)source.getString("db"));
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)source.getString("rs"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)"transform_operations");
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)"rs0");
    }

    @Test
    @FixFor(value={"DBZ-1442"})
    public void shouldAddSourceFieldsForRewriteDeleteEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord,db,rs");
        props.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(props);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{ '_id' : 4, 'name' : 'Sally' }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{ '_id' : 4 }")));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct source = ((Struct)record.value()).getStruct("source");
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.get("__h")).isEqualTo((Object)source.getInt64("h"));
        Assertions.assertThat((Object)value.get("__ts_ms")).isEqualTo((Object)source.getInt64("ts_ms"));
        Assertions.assertThat((Object)value.get("__ord")).isEqualTo((Object)source.getInt32("ord"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)source.getString("db"));
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)source.getString("rs"));
        Assertions.assertThat((Object)value.get("__db")).isEqualTo((Object)"transform_operations");
        Assertions.assertThat((Object)value.get("__rs")).isEqualTo((Object)"rs0");
    }

    @Test
    public void shouldTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("phone", (Object)123L).append("active", (Object)true).append("scores", Arrays.asList(1.2, 3.4, 5.6));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((String)value.schema().name()).isEqualTo((Object)("mongo.transform_operations." + this.getCollectionName()));
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("phone")).isEqualTo((Object)123L);
        Assertions.assertThat((Object)value.get("active")).isEqualTo((Object)true);
        Assertions.assertThat((Object)value.get("scores")).isEqualTo(Arrays.asList(1.2, 3.4, 5.6));
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("phone").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("active").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("scores").schema()).isEqualTo((Object)SchemaBuilder.array((Schema)SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA).optional().build());
        Assertions.assertThat((List)value.schema().fields()).hasSize(5);
    }

    @Test
    public void shouldTransformRecordForInsertEventWithComplexIdType() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        Document obj = new Document().append("_id", (Object)new Document().append("company", (Object)32).append("dept", (Object)"home improvement")).append("name", (Object)"Sally");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema().field("company").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)key.schema().field("id").schema().field("dept").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)((Struct)key.get("id")).get("company")).isEqualTo((Object)32);
        Assertions.assertThat((Object)((Struct)key.get("id")).get("dept")).isEqualTo((Object)"home improvement");
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)((Struct)value.get("_id")).get("company")).isEqualTo((Object)32);
        Assertions.assertThat((Object)((Struct)value.get("_id")).get("dept")).isEqualTo((Object)"home improvement");
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.schema().field("_id").schema().field("company").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("_id").schema().field("dept").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(2);
    }

    @Test
    public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally"));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.UPDATE.code());
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(2);
    }

    @Test
    @FixFor(value={"DBZ-612"})
    public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim").append("phone", (Object)123L).append("active", (Object)false);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally")).append("$unset", (Object)new Document().append("phone", (Object)true).append("active", (Object)false));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        if (TestHelper.isOplogCaptureMode()) {
            Assertions.assertThat((Object)value.get("phone")).isNull();
            Assertions.assertThat((Object)value.schema().field("phone").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
            Assertions.assertThat((List)value.schema().fields()).hasSize(4);
        } else {
            Assertions.assertThat((Object)value.schema().field("phone")).isNull();
            Assertions.assertThat((List)value.schema().fields()).hasSize(2);
        }
    }

    @Test
    @FixFor(value={"DBZ-612"})
    public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("phone", (Object)123L).append("active", (Object)false);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$unset", (Object)new Document().append("phone", (Object)true).append("active", (Object)false));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        if (TestHelper.isOplogCaptureMode()) {
            Assertions.assertThat((Object)value.get("phone")).isNull();
            Assertions.assertThat((Object)value.schema().field("phone").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
            Assertions.assertThat((List)value.schema().fields()).hasSize(3);
        } else {
            Assertions.assertThat((Object)value.schema().field("phone")).isNull();
            Assertions.assertThat((List)value.schema().fields()).hasSize(2);
        }
    }

    @Test
    @FixFor(value={"DBZ-582"})
    public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {
        this.restartConnectorWithoutEmittingTombstones();
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "none");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        this.validate(transformed);
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1032"})
    public void shouldGenerateRecordHeaderForTombstone() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        props.put("drop.tombstones", "false");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(1)));
        this.validate(transformed);
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)value).isNull();
    }

    @Test
    @FixFor(value={"DBZ-583"})
    public void shouldDropDeleteMessagesByDefault() throws InterruptedException {
        this.restartConnectorWithoutEmittingTombstones();
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Assertions.assertThat((Object)transformed).isNull();
    }

    @Test
    @FixFor(value={"DBZ-583"})
    public void shouldRewriteDeleteMessage() throws InterruptedException {
        this.restartConnectorWithoutEmittingTombstones();
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema().field("__deleted").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)value.get("__deleted")).isEqualTo((Object)true);
    }

    @Test
    @FixFor(value={"DBZ-583"})
    public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally"));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)value.schema().field("__deleted").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat((Object)value.get("__deleted")).isEqualTo((Object)false);
    }

    @Test
    public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(OPERATION_HEADER, "true");
        props.put(HANDLE_DELETES, "none");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}"))));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Iterator operationHeader = transformed.headers().allWithName("__op");
        Assertions.assertThat((boolean)operationHeader.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)operationHeader.next()).value().toString()).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value).isNull();
    }

    @Test
    @FixFor(value={"DBZ-971"})
    public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Tim");
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document("name", (Object)"Sally"));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        record.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)record);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(1);
        Iterator headers = transformed.headers().allWithName("application/debezium-test-header");
        Assertions.assertThat((boolean)headers.hasNext()).isTrue();
        Assertions.assertThat((String)((Header)headers.next()).value().toString()).isEqualTo((Object)"shouldPropagatePreviousRecordHeaders");
    }

    @Test
    public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address")).isEqualTo((Object)new Struct(value.schema().field("address").schema()).put("street", (Object)"Morris Park Ave").put("zipcode", (Object)"10462"));
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address").schema()).isEqualTo((Object)SchemaBuilder.struct().name("mongo.transform_operations." + this.getCollectionName() + ".address").optional().field("street", Schema.OPTIONAL_STRING_SCHEMA).field("zipcode", Schema.OPTIONAL_STRING_SCHEMA).build());
        Assertions.assertThat((List)value.schema().fields()).hasSize(3);
    }

    @Test
    public void shouldFlattenTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FLATTEN_STRUCT, "true");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address_street")).isEqualTo((Object)"Morris Park Ave");
        Assertions.assertThat((Object)value.get("address_zipcode")).isEqualTo((Object)"10462");
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address_street").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address_zipcode").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForInsertEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(DELIMITER, "-");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address-street")).isEqualTo((Object)"Morris Park Ave");
        Assertions.assertThat((Object)value.get("address-zipcode")).isEqualTo((Object)"10462");
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-street").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-zipcode").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(FLATTEN_STRUCT, "true");
        props.put(DELIMITER, "-");
        this.transformation.configure(props);
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("street", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document(Collect.hashMapOf((Object)"address.city", (Object)"Canberra", (Object)"address.name", (Object)"James", (Object)"address.city2.part", (Object)3)));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("address-city")).isEqualTo((Object)"Canberra");
        Assertions.assertThat((Object)value.get("address-name")).isEqualTo((Object)"James");
        Assertions.assertThat((Object)value.get("address-city2-part")).isEqualTo((Object)3);
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-city").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-name").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("address-city2-part").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((List)value.schema().fields()).hasSize(TestHelper.isOplogCaptureMode() ? 4 : 7);
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddHeader() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_HEADERS, "op");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(1);
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddHeadersForMissingOrInvalidFields() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_HEADERS, "op,id");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(2);
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "__id")).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1791", "DBZ-2504"})
    public void testAddHeadersSpecifyingStruct() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_HEADERS, "op,source.rs,source.collection");
        props.put(ADD_HEADERS_PREFIX, "prefix.");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Iterable)transformed.headers()).hasSize(3);
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "prefix.op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "prefix.source_rs")).isEqualTo((Object)"rs0");
        Assertions.assertThat((String)this.getSourceRecordHeaderByKey(transformed, "prefix.source_collection")).isEqualTo((Object)this.getCollectionName());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddField() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor(value={"DBZ-1791", "DBZ-2504"})
    public void testAddFields() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op , ts_ms");
        props.put(ADD_FIELDS_PREFIX, "prefix.");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("prefix.op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("prefix.ts_ms")).isNotNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsForMissingOptionalField() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op,id");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__id")).isNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsSpecifyStruct() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "op,source.rs,source.collection");
        this.transformation.configure(props);
        SourceRecord createRecord = this.createCreateRecord();
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)createRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_rs")).isEqualTo((Object)"rs0");
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_collection")).isEqualTo((Object)this.getCollectionName());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldHandleDeleteRewrite() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op");
        this.transformation.configure(props);
        SourceRecord deleteRecord = (SourceRecord)this.createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void tesAddFieldsHandleDeleteRewrite() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,ts_ms");
        this.transformation.configure(props);
        SourceRecord deleteRecord = (SourceRecord)this.createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__ts_ms")).isNotNull();
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsSpecifyStructHandleDeleteRewrite() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,source.rs,source.collection");
        this.transformation.configure(props);
        SourceRecord deleteRecord = (SourceRecord)this.createDeleteRecordWithTombstone().allRecordsInOrder().get(0);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_rs")).isEqualTo((Object)"rs0");
        Assertions.assertThat((Object)((Struct)transformed.value()).get("__source_collection")).isEqualTo((Object)this.getCollectionName());
    }

    @Test
    @FixFor(value={"DBZ-1791"})
    public void testAddFieldsHandleDeleteRewriteAndTombstone() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(HANDLE_DELETES, "rewrite");
        props.put(ADD_FIELDS, "op,ts_ms");
        props.put("drop.tombstones", "false");
        this.transformation.configure(props);
        AbstractConnectorTest.SourceRecords records = this.createDeleteRecordWithTombstone();
        SourceRecord deleteRecord = (SourceRecord)records.allRecordsInOrder().get(0);
        SourceRecord deleteTransformed = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Assertions.assertThat((Object)((Struct)deleteTransformed.value()).get("__deleted")).isEqualTo((Object)true);
        Assertions.assertThat((Object)((Struct)deleteTransformed.value()).get("__op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Object)((Struct)deleteTransformed.value()).get("__ts_ms")).isNotNull();
        SourceRecord tombstoneRecord = (SourceRecord)records.allRecordsInOrder().get(1);
        SourceRecord tombstoneTransformed = (SourceRecord)this.transformation.apply((ConnectRecord)tombstoneRecord);
        Assertions.assertThat((Object)tombstoneTransformed.value()).isNull();
    }

    @Test
    @FixFor(value={"DBZ-2585"})
    public void testEmptyArray() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put(ARRAY_ENCODING, "array");
        transformationConfig.put("sanitize.field.names", "true");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{'empty_array': [] }")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("empty_array")).isNull();
        VerifyRecord.isValid((SourceRecord)transformedInsert);
    }

    @Test
    @FixFor(value={"DBZ-2455"})
    public void testAddPatchFieldAfterUpdate() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("a", (Object)1).append("b", (Object)2).append("c", (Object)3);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document(Collect.hashMapOf((Object)"a", (Object)22)));
        this.primary().execute("update", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).updateOne((Bson)RawBsonDocument.parse((String)("{ '_id' : { '$oid' : '" + objId + "'}}")), (Bson)updateObj));
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ADD_FIELDS, "patch");
        this.transformation.configure(props);
        SourceRecord transformed = (SourceRecord)this.transformation.apply((ConnectRecord)((SourceRecord)records.allRecordsInOrder().get(0)));
        Struct key = (Struct)transformed.key();
        Struct value = (Struct)transformed.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)transformed.keySchema());
        Assertions.assertThat((Object)key.schema().field("id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)transformed.valueSchema());
        Assertions.assertThat((Object)value.get("_id")).isEqualTo((Object)objId.toString());
        Assertions.assertThat((Object)value.get("a")).isEqualTo((Object)22);
        Assertions.assertThat((Object)value.schema().field("_id").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)value.schema().field("a").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        if (TestHelper.isOplogCaptureMode()) {
            String valueJson = TestHelper.getDocumentWithoutLanguageVersion(value.getString("__patch")).toJson();
            Assertions.assertThat((String)valueJson).isEqualTo((Object)"{\"$set\": {\"a\": 22}}");
            Assertions.assertThat((Object)value.schema().field("__patch").schema()).isEqualTo((Object)Json.builder().optional().build());
            Assertions.assertThat((List)value.schema().fields()).hasSize(3);
        } else {
            Assertions.assertThat((List)value.schema().fields()).hasSize(5);
            Assertions.assertThat((Object)value.schema().field("__patch").schema()).isEqualTo((Object)Json.builder().optional().build());
            Assertions.assertThat((Object)value.get("__patch")).isNull();
            Assertions.assertThat((Object)value.get("b")).isEqualTo((Object)2);
            Assertions.assertThat((Object)value.schema().field("b").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
            Assertions.assertThat((Object)value.get("c")).isEqualTo((Object)3);
            Assertions.assertThat((Object)value.schema().field("c").schema()).isEqualTo((Object)SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        }
    }

    @Test(expected=DataException.class)
    @FixFor(value={"DBZ-2316"})
    public void testShouldThrowExceptionWithElementsDifferingStructures() throws Exception {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        HashMap<String, String> props = new HashMap<String, String>();
        props.put(ARRAY_ENCODING, "array");
        props.put(ADD_FIELDS, "op,source.ts_ms");
        this.transformation.configure(props);
        AbstractConnectorTest.SourceRecords records = this.createCreateRecordFromJson("dbz-2316.json");
        for (SourceRecord record : records.allRecordsInOrder()) {
            this.transformation.apply((ConnectRecord)record);
        }
    }

    @Test
    @FixFor(value={"DBZ-2569"})
    public void testMatrixType() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put(ARRAY_ENCODING, "array");
        transformationConfig.put("drop.tombstones", "false");
        transformationConfig.put(HANDLE_DELETES, "none");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{  'matrix': [    [1,2,3],    [4,5,6],    [7,8,9],  ]  ,'array_complex': [    {'k1' : 'v1','k2' : 1},    {'k1' : 'v2','k2' : 2},  ]  ,'matrix_complex': [    [      {'k3' : 'v111',       'k4' : [1,2,3]},      {'k3' : 'v211',       'k4' : [4,5,6]}    ],    [      {'k3' : 'v112',       'k4' : [7,8]},      {'k3' : 'v212',       'k4' : [8]}    ],  ]}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Struct transformedInsertValue = (Struct)transformedInsert.value();
        Schema matrixSchema = transformedInsert.valueSchema().field("matrix").schema();
        Assertions.assertThat((Object)matrixSchema.type()).isEqualTo((Object)Schema.Type.ARRAY);
        Schema subMatrixSchema = matrixSchema.valueSchema().schema();
        Assertions.assertThat((Object)subMatrixSchema.type()).isEqualTo((Object)Schema.Type.ARRAY);
        Assertions.assertThat((Object)subMatrixSchema.valueSchema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsertValue.get("matrix")).isEqualTo(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9)));
        Schema arrayComplexSchema = transformedInsert.valueSchema().field("array_complex").schema();
        Assertions.assertThat((Object)arrayComplexSchema.type()).isEqualTo((Object)Schema.Type.ARRAY);
        Schema subArrayComplexSchema = arrayComplexSchema.valueSchema().schema();
        Assertions.assertThat((Object)subArrayComplexSchema.type()).isEqualTo((Object)Schema.Type.STRUCT);
        Assertions.assertThat((Object)subArrayComplexSchema.field("k1").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)subArrayComplexSchema.field("k2").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Field k1 = subArrayComplexSchema.field("k1");
        Field k2 = subArrayComplexSchema.field("k2");
        Struct subStruct1 = new Struct(subArrayComplexSchema);
        subStruct1.put(k1, (Object)"v1");
        subStruct1.put(k2, (Object)1);
        Struct subStruct2 = new Struct(subArrayComplexSchema);
        subStruct2.put(k1, (Object)"v2");
        subStruct2.put(k2, (Object)2);
        Assertions.assertThat((Object)transformedInsertValue.get("array_complex")).isEqualTo(Arrays.asList(subStruct1, subStruct2));
        Schema matrixComplexSchema = transformedInsert.valueSchema().field("matrix_complex").schema();
        Assertions.assertThat((Object)matrixComplexSchema.type()).isEqualTo((Object)Schema.Type.ARRAY);
        Schema subMatrixComplexSchema = matrixComplexSchema.valueSchema().schema();
        Assertions.assertThat((Object)subMatrixComplexSchema.type()).isEqualTo((Object)Schema.Type.ARRAY);
        Schema strucSchema = subMatrixComplexSchema.valueSchema();
        Assertions.assertThat((Object)strucSchema.schema().type()).isEqualTo((Object)Schema.Type.STRUCT);
        Assertions.assertThat((Object)strucSchema.field("k3").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)strucSchema.field("k4").schema().type()).isEqualTo((Object)Schema.Type.ARRAY);
        Assertions.assertThat((Object)strucSchema.field("k4").schema().valueSchema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Field k3 = strucSchema.field("k3");
        Field k4 = strucSchema.field("k4");
        Struct subStruct11 = new Struct(strucSchema.schema());
        subStruct11.put(k3, (Object)"v111");
        subStruct11.put(k4, Arrays.asList(1, 2, 3));
        Struct subStruct12 = new Struct(strucSchema.schema());
        subStruct12.put(k3, (Object)"v112");
        subStruct12.put(k4, Arrays.asList(7, 8));
        Struct subStruct21 = new Struct(strucSchema.schema());
        subStruct21.put(k3, (Object)"v211");
        subStruct21.put(k4, Arrays.asList(4, 5, 6));
        Struct subStruct22 = new Struct(strucSchema.schema());
        subStruct22.put(k3, (Object)"v212");
        subStruct22.put(k4, Arrays.asList(8));
        Assertions.assertThat((Object)transformedInsertValue.get("matrix_complex")).isEqualTo(Arrays.asList(Arrays.asList(subStruct11, subStruct21), Arrays.asList(subStruct12, subStruct22)));
    }

    @Test
    @FixFor(value={"DBZ-2569"})
    public void testMatrixArrayAsDocumentType() throws InterruptedException, IOException {
        HashMap<String, String> transformationConfig = new HashMap<String, String>();
        transformationConfig.put(ARRAY_ENCODING, "document");
        transformationConfig.put("drop.tombstones", "false");
        transformationConfig.put(HANDLE_DELETES, "none");
        this.transformation.configure(transformationConfig);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{  'matrix': [    [1,'aa',3],    [4,5,'6'],    [7.0,8],  ]}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Schema matrixSchema = transformedInsert.valueSchema().field("matrix").schema();
        Assertions.assertThat((Object)matrixSchema.type()).isEqualTo((Object)Schema.Type.STRUCT);
        Assertions.assertThat((int)matrixSchema.fields().size()).isEqualTo(3);
        Schema firstSubSchema = matrixSchema.field("_0").schema();
        Assertions.assertThat((Object)firstSubSchema.type()).isEqualTo((Object)Schema.Type.STRUCT);
        Assertions.assertThat((int)firstSubSchema.fields().size()).isEqualTo(3);
        Assertions.assertThat((Object)firstSubSchema.field("_0").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)firstSubSchema.field("_1").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)firstSubSchema.field("_2").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Schema secondSubSchema = matrixSchema.field("_1").schema();
        Assertions.assertThat((Object)secondSubSchema.type()).isEqualTo((Object)Schema.Type.STRUCT);
        Assertions.assertThat((int)secondSubSchema.fields().size()).isEqualTo(3);
        Assertions.assertThat((Object)secondSubSchema.field("_0").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)secondSubSchema.field("_1").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)secondSubSchema.field("_2").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Schema thirdSubSchema = matrixSchema.field("_2").schema();
        Assertions.assertThat((Object)thirdSubSchema.type()).isEqualTo((Object)Schema.Type.STRUCT);
        Assertions.assertThat((int)thirdSubSchema.fields().size()).isEqualTo(2);
        Assertions.assertThat((Object)thirdSubSchema.field("_0").schema()).isEqualTo((Object)Schema.OPTIONAL_FLOAT64_SCHEMA);
        Assertions.assertThat((Object)thirdSubSchema.field("_1").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Struct transformedInsertValue = (Struct)transformedInsert.value();
        Struct firstSubStruct = new Struct(firstSubSchema);
        firstSubStruct.put(firstSubSchema.field("_0"), (Object)1);
        firstSubStruct.put(firstSubSchema.field("_1"), (Object)"aa");
        firstSubStruct.put(firstSubSchema.field("_2"), (Object)3);
        Struct secondSubStruct = new Struct(secondSubSchema);
        secondSubStruct.put(secondSubSchema.field("_0"), (Object)4);
        secondSubStruct.put(secondSubSchema.field("_1"), (Object)5);
        secondSubStruct.put(secondSubSchema.field("_2"), (Object)"6");
        Struct thirdSubStruct = new Struct(thirdSubSchema);
        thirdSubStruct.put(thirdSubSchema.field("_0"), (Object)7.0);
        thirdSubStruct.put(thirdSubSchema.field("_1"), (Object)8);
        Struct struct = new Struct(matrixSchema);
        struct.put(matrixSchema.field("_0"), (Object)firstSubStruct);
        struct.put(matrixSchema.field("_1"), (Object)secondSubStruct);
        struct.put(matrixSchema.field("_2"), (Object)thirdSubStruct);
        Assertions.assertThat((Object)transformedInsertValue.get("matrix")).isEqualTo((Object)struct);
    }

    @Test
    @FixFor(value={"DBZ-5434"})
    public void shouldSupportNestedArrays() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning();
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)Document.parse((String)"{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6c\"),\"f1\":{\"f2\":[{\"f3\":{}},{\"f3\":{\"f5\":5}}]}}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Struct transformedInsertValue = (Struct)transformedInsert.value();
        Schema transformedInsertSchema = transformedInsert.valueSchema();
        transformedInsertSchema.field("f1").schema().field("f2");
        Assertions.assertThat((Object)transformedInsertSchema.field("f1").schema().field("f2").schema().valueSchema().field("f3").schema().field("f5").schema().type()).isEqualTo((Object)Schema.INT32_SCHEMA.type());
        Assertions.assertThat((int)transformedInsertValue.getStruct("f1").getArray("f2").size()).isEqualTo(2);
        this.primary().execute("delete", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)RawBsonDocument.parse((String)"{'_id' : ObjectId('6182b1a25711ed59dd6a1d6c')}")));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertMany(Collect.arrayListOf((Object)"{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6c\"),\"f1\":{\"f2\":[{\"f3\":[]},{\"f3\":[{\"f5\":5}]}]}}", (Object[])new String[]{"{\"_id\":ObjectId(\"6182b1a25711ed59dd6a1d6d\"),\"f1\":{\"f2\":[{\"f3\":[]},{\"f3\":[]}]}}"}).stream().map(Document::parse).collect(Collectors.toList())));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        List transformedInserts = records.allRecordsInOrder().stream().map(m -> (SourceRecord)this.transformation.apply((ConnectRecord)m)).collect(Collectors.toList());
        transformedInsertValue = (Struct)((SourceRecord)transformedInserts.get(0)).value();
        Assertions.assertThat((int)transformedInsertValue.getStruct("f1").getArray("f2").size()).isEqualTo(2);
        transformedInsertValue = (Struct)((SourceRecord)transformedInserts.get(1)).value();
        List f2 = transformedInsertValue.getStruct("f1").getArray("f2");
        Assertions.assertThat((int)f2.size()).isEqualTo(2);
        Assertions.assertThat((int)((Struct)f2.get(0)).getArray("f3").size()).isEqualTo(0);
    }

    private AbstractConnectorTest.SourceRecords createCreateRecordFromJson(String pathOnClasspath) throws Exception {
        List<Document> documents = this.loadTestDocuments(pathOnClasspath);
        this.primary().execute("Load JSON", client -> {
            for (Document document : documents) {
                client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)document);
            }
        });
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(documents.size());
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(documents.size());
        this.assertNoRecordsToConsume();
        return records;
    }

    private SourceRecord createCreateRecord() throws Exception {
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("struct", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        return (SourceRecord)records.allRecordsInOrder().get(0);
    }

    private AbstractConnectorTest.SourceRecords createDeleteRecordWithTombstone() throws Exception {
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("address", (Object)new Document().append("struct", (Object)"Morris Park Ave").append("zipcode", (Object)"10462"));
        this.primary().execute("insert", client -> client.getDatabase("transform_operations").getCollection(this.getCollectionName()).insertOne((Object)obj));
        AbstractConnectorTest.SourceRecords createRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)createRecords.recordsForTopic(this.topicName()).size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.primary().execute("delete", client -> {
            Document filter = Document.parse((String)("{\"_id\": {\"$oid\": \"" + objId + "\"}}"));
            client.getDatabase("transform_operations").getCollection(this.getCollectionName()).deleteOne((Bson)filter);
        });
        AbstractConnectorTest.SourceRecords deleteRecords = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)deleteRecords.recordsForTopic(this.topicName()).size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        return deleteRecords;
    }

    private static void waitForStreamingRunning() throws InterruptedException {
        ExtractNewDocumentStateTestIT.waitForStreamingRunning("mongodb", "mongo");
    }

    private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey) {
        Iterator headers = record.headers().allWithName(headerKey);
        if (!headers.hasNext()) {
            return null;
        }
        Object value = ((Header)headers.next()).value();
        return value != null ? value.toString() : null;
    }
}

