/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.AbstractMongoConnectorIT;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.bson.Document;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class BlockingSnapshotIT
extends AbstractMongoConnectorIT {
    protected static final int ROW_COUNT = 1000;
    private static final String DATABASE_NAME = "dbA";
    private static final String COLLECTION_NAME = "c1";
    private static final String COLLECTION2_NAME = "c2";
    private static final String SIGNAL_COLLECTION_NAME = "dbA.signals";
    private static final String FULL_COLLECTION_NAME = "dbA.c1";
    private static final String FULL_COLLECTION2_NAME = "dbA.c2";
    private static final String DOCUMENT_ID = "_id";

    @Before
    public void before() {
        this.context = new MongoDbTaskContext(this.config().build());
        TestHelper.cleanDatabase(mongo, DATABASE_NAME);
    }

    @After
    public void after() {
        TestHelper.cleanDatabase(mongo, DATABASE_NAME);
    }

    @Test
    public void executeBlockingSnapshot() throws Exception {
        this.populateDataCollection();
        this.startConnector(Function.identity());
        BlockingSnapshotIT.waitForSnapshotToBeCompleted((String)"mongodb", (String)"mongo1", (String)"0", null);
        this.insertRecords(1000, 1000);
        this.assertRecordsFromSnapshotAndStreamingArePresent(2000);
        this.sendAdHocBlockingSnapshotSignal("[A-z].*" + this.fullDataCollectionName());
        BlockingSnapshotIT.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.assertRecordsFromSnapshotAndStreamingArePresent(2001);
        this.insertRecords(1000, 2000);
        this.assertStreamingRecordsArePresent(1000);
    }

    @Test
    public void executeBlockingSnapshotWhileStreaming() throws Exception {
        this.populateDataCollection();
        this.startConnector(Function.identity());
        BlockingSnapshotIT.waitForSnapshotToBeCompleted((String)"mongodb", (String)"mongo1", (String)"0", null);
        Future<?> batchInserts = this.executeAsync(this.insertTask());
        Thread.sleep(2000L);
        this.sendAdHocBlockingSnapshotSignal("[A-z].*" + this.fullDataCollectionName());
        BlockingSnapshotIT.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        BlockingSnapshotIT.waitForStreamingRunning((String)"mongodb", (String)"mongo1", (String)BlockingSnapshotIT.getStreamingNamespace(), (String)"0");
        Long totalSnapshotRecords = this.getTotalSnapshotRecords(this.replicaSetFullDataCollectionName(), "mongodb", "mongo1", "0", null);
        batchInserts.get(120L, TimeUnit.SECONDS);
        this.insertRecords(1000, 2000);
        int signalingRecords = 1;
        this.assertRecordsWithValuesPresent((int)(3000L + totalSnapshotRecords + (long)signalingRecords), BlockingSnapshotIT.getExpectedValues(totalSnapshotRecords), this.topicName());
    }

    @Test
    public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
        this.populateDataCollection(this.dataCollectionNames().get(1).toString());
        this.startConnector(Function.identity());
        BlockingSnapshotIT.waitForStreamingRunning((String)"mongodb", (String)"mongo1", (String)BlockingSnapshotIT.getStreamingNamespace(), (String)"0");
        this.sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(this.fullDataCollectionNames().get(1), "{ aa: { $lt: 500 } }"), "[A-z].*" + this.fullDataCollectionNames().get(1));
        BlockingSnapshotIT.waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        int signalingRecords = 1;
        this.assertRecordsWithValuesPresent(500 + signalingRecords, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), this.topicNames().get(1));
    }

    protected Class<MongoDbConnector> connectorClass() {
        return MongoDbConnector.class;
    }

    public static int waitTimeForRecords() {
        return 3;
    }

    protected Configuration.Builder config() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration(mongo).edit().with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, String.join((CharSequence)",", this.fullDataCollectionNames()))).with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME)).with(MongoDbConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)).with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)).with(MongoDbConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*dbA.c1")).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.INITIAL);
    }

    protected String dataCollectionName() {
        return COLLECTION_NAME;
    }

    protected List<String> dataCollectionNames() {
        return List.of(COLLECTION_NAME, COLLECTION2_NAME);
    }

    protected String fullDataCollectionName() {
        return FULL_COLLECTION_NAME;
    }

    protected List<String> fullDataCollectionNames() {
        return List.of(FULL_COLLECTION_NAME, FULL_COLLECTION2_NAME);
    }

    protected String topicName() {
        return "mongo1." + this.fullDataCollectionName();
    }

    protected List<String> topicNames() {
        return this.fullDataCollectionNames().stream().map(x -> "mongo1." + x).collect(Collectors.toList());
    }

    protected void populateDataCollection(String dataCollectionName) {
        Document[] documents = new Document[1000];
        for (int i = 0; i < 1000; ++i) {
            Document doc = new Document();
            doc.append(DOCUMENT_ID, (Object)(i + 1)).append("aa", (Object)i);
            documents[i] = doc;
        }
        this.insertDocumentsInTx(DATABASE_NAME, dataCollectionName, documents);
    }

    protected void populateDataCollection() {
        this.populateDataCollection(this.dataCollectionName());
    }

    protected int insertMaxSleep() {
        return 100;
    }

    private Runnable insertTask() {
        return () -> {
            try {
                this.insertRecordsWithRandomSleep(1000, 1000, this.insertMaxSleep());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private Long getTotalSnapshotRecords(String table, String connector, String server, String task, String database) throws MalformedObjectNameException, ReflectionException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        Map rowsScanned = (Map)mbeanServer.getAttribute(BlockingSnapshotIT.getSnapshotMetricsObjectName((String)connector, (String)server, (String)task, (String)database), "RowsScanned");
        String unquotedTableName = table.replace("`", "");
        return (Long)rowsScanned.get(unquotedTableName);
    }

    private static List<Integer> getExpectedValues(Long totalSnapshotRecords) {
        List initialSnapShotValues = IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
        List firstStreamingBatchValues = IntStream.rangeClosed(1000, 1999).boxed().collect(Collectors.toList());
        List blockingSnapshotValues = Stream.of(initialSnapShotValues, IntStream.rangeClosed(1000, Math.toIntExact(totalSnapshotRecords)).boxed().collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
        List secondStreamingBatchValues = IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList());
        return Stream.of(initialSnapShotValues, firstStreamingBatchValues, blockingSnapshotValues, secondStreamingBatchValues).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private static void waitForLogMessage(String message, Class<?> logEmitterClass) {
        LogInterceptor interceptor = new LogInterceptor(logEmitterClass);
        Awaitility.await().alias("Snapshot not completed on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost((long)BlockingSnapshotIT.waitTimeForRecords() * 30L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage(message));
    }

    private Future<?> executeAsync(Runnable operation) {
        return Executors.newSingleThreadExecutor().submit(operation);
    }

    private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException {
        this.assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()), this.topicName());
    }

    private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException {
        this.assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 2).boxed().collect(Collectors.toList()), this.topicName());
    }

    private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues, String topicName) throws InterruptedException {
        AbstractConnectorTest.SourceRecords snapshotAndStreamingRecords = this.consumeRecordsByTopic(expectedRecords, 10);
        Assertions.assertThat((int)snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords);
        List actual = snapshotAndStreamingRecords.recordsForTopic(topicName).stream().map(record -> this.extractFieldValue((SourceRecord)record, "aa")).collect(Collectors.toList());
        Assertions.assertThat(actual).containsAll(expectedValues);
    }

    protected Integer extractFieldValue(SourceRecord record, String fieldName) {
        String after = ((Struct)record.value()).getString("after");
        Pattern p = Pattern.compile("\"" + fieldName + "\": (\\d+)");
        Matcher m = p.matcher(after);
        m.find();
        return Integer.parseInt(m.group(1));
    }

    private void insertRecords(int rowCount, int startingPkId) {
        Document[] documents = new Document[1000];
        for (int i = 0; i < rowCount; ++i) {
            Document doc = new Document();
            doc.append(DOCUMENT_ID, (Object)(i + startingPkId + 1)).append("aa", (Object)(i + startingPkId));
            documents[i] = doc;
        }
        this.insertDocumentsInTx(DATABASE_NAME, COLLECTION_NAME, documents);
    }

    private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep, Runnable actionOnInsert) {
        try {
            for (int i = 0; i < rowCount; ++i) {
                Document doc = new Document();
                doc.append(DOCUMENT_ID, (Object)(i + startingPkId + 1)).append("aa", (Object)(i + startingPkId));
                this.insertDocuments(DATABASE_NAME, COLLECTION_NAME, doc);
                actionOnInsert.run();
                int sleepTime = ThreadLocalRandom.current().nextInt(1, maxSleep);
                Thread.sleep(sleepTime);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void insertRecordsWithRandomSleep(int rowCount, int startingPkId, int maxSleep) {
        this.insertRecordsWithRandomSleep(rowCount, startingPkId, maxSleep, () -> {});
    }

    protected void sendAdHocBlockingSnapshotSignal(String ... dataCollectionIds) {
        String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\"" + x + "\"").collect(Collectors.joining(", "));
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)("{\"type\": \"execute-snapshot\", \"payload\": {\"type\": \"BLOCKING\",\"data-collections\": [" + dataCollectionIdsList + "]}}")));
    }

    protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> additionalConditions, String ... dataCollectionIds) {
        String conditions = additionalConditions.entrySet().stream().map(e -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", e.getKey(), e.getValue())).collect(Collectors.joining(","));
        String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\"" + x + "\"").collect(Collectors.joining(", "));
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)("{\"type\": \"execute-snapshot\", \"payload\": {\"type\": \"BLOCKING\",\"data-collections\": [" + dataCollectionIdsList + "], \"additional-conditions\": [" + conditions + "]}}")));
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        this.startConnector(custConfig, (DebeziumEngine.CompletionCallback)this.loggingCompletion());
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig, DebeziumEngine.CompletionCallback callback) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(this.connectorClass(), config, callback);
        this.waitForAvailableRecords(5L, TimeUnit.SECONDS);
    }

    @NotNull
    private String replicaSetFullDataCollectionName() {
        return "rs0." + this.fullDataCollectionName();
    }
}

