/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.AbstractMongoConnectorIT;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbConnectorTask;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.SkipForOplogTestRule;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.Document;
import org.fest.assertions.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

public class KeepCaptureModeAfterRestartIT
extends AbstractMongoConnectorIT {
    @Rule
    public final TestRule rule = new SkipForOplogTestRule();

    @Test
    public void changeStreamsToOplog() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
        this.testSwitch(MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_UPDATE_FULL, MongoDbConnectorConfig.CaptureMode.OPLOG);
        this.stopConnector(value -> Assertions.assertThat((logInterceptor.containsWarnMessage("Stored offsets were created using change streams capturing.") && logInterceptor.containsWarnMessage("Switching configuration to 'CHANGE_STREAMS_UPDATE_FULL'") && logInterceptor.containsWarnMessage("Either reconfigure the connector or remove the old offsets") ? 1 : 0) != 0).isTrue());
    }

    @Test
    public void oplogToChangeStreams() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
        this.testSwitch(MongoDbConnectorConfig.CaptureMode.OPLOG, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsMessage("Stored offsets were created using oplog capturing, trying to switch to change streams.")).isTrue());
    }

    public void testSwitch(MongoDbConnectorConfig.CaptureMode from, MongoDbConnectorConfig.CaptureMode to) throws InterruptedException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.INITIAL)).with(MongoDbConnectorConfig.CAPTURE_MODE, (EnumeratedValue)from)).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbA");
        this.start(MongoDbConnector.class, this.config);
        this.assertConnectorIsRunning();
        KeepCaptureModeAfterRestartIT.waitForSnapshotToBeCompleted((String)"mongodb", (String)"mongo1");
        List<Document> documentsToInsert = this.loadTestDocuments("restaurants1.json");
        this.insertDocumentsInTx("dbA", "c1", documentsToInsert.toArray(new Document[0]));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        List c1s = records.recordsForTopic("mongo1.dbA.c1");
        Assertions.assertThat((List)c1s).hasSize(6);
        this.stopConnector();
        List<Document> documentsToInsert2 = this.loadTestDocuments("restaurants6.json");
        this.insertDocuments("dbA", "c1", documentsToInsert2.toArray(new Document[0]));
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.INITIAL)).with(MongoDbConnectorConfig.CAPTURE_MODE, (EnumeratedValue)to)).build();
        this.start(MongoDbConnector.class, this.config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(1);
        List c1s2 = records2.recordsForTopic("mongo1.dbA.c1");
        Assertions.assertThat((List)c1s2).hasSize(1);
        BsonDocument first = BsonDocument.parse((String)((Struct)((SourceRecord)c1s2.get(0)).value()).getString("after"));
        Assertions.assertThat((String)first.getString((Object)"restaurant_id").getValue()).isEqualTo((Object)"80364347");
    }
}

