/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb.transforms;

import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.ReplicaSet;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class UnwrapFromMongoDbEnvelopeTestIT
extends AbstractConnectorTest {
    private static final String DB_NAME = "transform";
    private static final String COLLECTION_NAME = "source";
    private static final String TOPIC_NAME = "mongo.transform.source";
    private Configuration config;
    private MongoDbTaskContext context;
    private UnwrapFromMongoDbEnvelope<SourceRecord> transformation;

    @Before
    public void beforeEach() {
        Testing.Debug.disable();
        Testing.Print.disable();
        this.stopConnector();
        this.initializeConnectorTestFramework();
        this.transformation = new UnwrapFromMongoDbEnvelope();
        this.transformation.configure(Collections.emptyMap());
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            if (this.context != null) {
                this.context.getConnectionContext().shutdown();
            }
        }
        this.transformation.close();
    }

    @Test
    public void shouldTransformEvents() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "transform.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), DB_NAME);
        this.start(MongoDbConnector.class, this.config);
        this.primary().execute("insert", client -> client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne((Object)Document.parse((String)"{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}")));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(TOPIC_NAME).size()).isEqualTo(1);
        SourceRecord insertRecord = (SourceRecord)records.recordsForTopic(TOPIC_NAME).get(0);
        SourceRecord transformedInsert = (SourceRecord)this.transformation.apply((ConnectRecord)insertRecord);
        Struct transformedInsertValue = (Struct)transformedInsert.value();
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataInt").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedInsert.valueSchema().field("dataLong").schema()).isEqualTo((Object)Schema.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat((Object)transformedInsertValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedInsertValue.get("dataStr")).isEqualTo((Object)"hello");
        Assertions.assertThat((Object)transformedInsertValue.get("dataInt")).isEqualTo((Object)123);
        Assertions.assertThat((Object)transformedInsertValue.get("dataLong")).isEqualTo((Object)80000000000L);
        this.primary().execute("update", client -> client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).updateOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}"), (Bson)RawBsonDocument.parse((String)"{'$set': {'dataStr': 'bye'}}")));
        records = this.consumeRecordsByTopic(1);
        SourceRecord candidateRecord = (SourceRecord)records.recordsForTopic(TOPIC_NAME).get(0);
        if (((Struct)candidateRecord.value()).get("op").equals("c")) {
            records = this.consumeRecordsByTopic(1);
        }
        Assertions.assertThat((int)records.recordsForTopic(TOPIC_NAME).size()).isEqualTo(1);
        SourceRecord updateRecord = (SourceRecord)records.recordsForTopic(TOPIC_NAME).get(0);
        SourceRecord transformedUpdate = (SourceRecord)this.transformation.apply((ConnectRecord)updateRecord);
        Struct transformedUpdateValue = (Struct)transformedUpdate.value();
        Assertions.assertThat((Object)transformedUpdate.valueSchema().field("id").schema()).isEqualTo((Object)Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat((Object)transformedUpdate.valueSchema().field("dataStr").schema()).isEqualTo((Object)Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat((Object)transformedUpdateValue.get("id")).isEqualTo((Object)1);
        Assertions.assertThat((Object)transformedUpdateValue.get("dataStr")).isEqualTo((Object)"bye");
        this.primary().execute("delete", client -> client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).deleteOne((Bson)RawBsonDocument.parse((String)"{'_id' : 1}")));
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(TOPIC_NAME).size()).isEqualTo(2);
        SourceRecord deleteRecord = (SourceRecord)records.recordsForTopic(TOPIC_NAME).get(0);
        SourceRecord transformedDelete = (SourceRecord)this.transformation.apply((ConnectRecord)deleteRecord);
        Struct transformedDeleteValue = (Struct)transformedDelete.value();
        Assertions.assertThat((Object)transformedDeleteValue).isNull();
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic(TOPIC_NAME).get(1)).value()).isNull();
    }

    private ConnectionContext.MongoPrimary primary() {
        ReplicaSet replicaSet = ReplicaSet.parse((String)this.context.getConnectionContext().hosts());
        return this.context.getConnectionContext().primaryFor(replicaSet, this.context.filters(), this.connectionErrorHandler(3));
    }

    private BiConsumer<String, Throwable> connectionErrorHandler(int numErrorsBeforeFailing) {
        AtomicInteger attempts = new AtomicInteger();
        return (desc, error) -> {
            if (attempts.incrementAndGet() > numErrorsBeforeFailing) {
                Assert.fail((String)("Unable to connect to primary after " + numErrorsBeforeFailing + " errors trying to " + desc + ": " + error));
            }
            this.logger.error("Error while attempting to {}: {}", new Object[]{desc, error.getMessage(), error});
        };
    }
}

