/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.AbstractShardedMongoConnectorIT;
import io.debezium.connector.mongodb.JsonSerialization;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.junit.Test;

public class ShardedIncrementalSnapshotIT
extends AbstractShardedMongoConnectorIT {
    protected static final int ROW_COUNT = 1000;
    private static final int MAXIMUM_NO_RECORDS_CONSUMES = 3;
    private static final String DATABASE_NAME = "dbA";
    private static final String COLLECTION_NAME = "c1";
    private static final String SIGNAL_COLLECTION_NAME = "dbA.signals";
    private static final String FULL_COLLECTION_NAME = "dbA.c1";
    private static final String DOCUMENT_ID = "_id";
    private static final int CHUNK_SIZE = 100;

    @Override
    protected String shardedDatabase() {
        return DATABASE_NAME;
    }

    @Override
    protected Map<String, String> shardedCollections() {
        return Map.of(COLLECTION_NAME, DOCUMENT_ID);
    }

    protected String fullDataCollectionName() {
        return FULL_COLLECTION_NAME;
    }

    protected String topicName() {
        return "mongo1." + this.fullDataCollectionName();
    }

    protected Configuration.Builder config() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration((MongoDbDeployment)mongo).edit().with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, this.shardedDatabase())).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, this.fullDataCollectionName() + ",dbA.c1,dbA.c2")).with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME)).with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 100)).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.NEVER)).with(MongoDbConnectorConfig.CONNECTION_MODE, (EnumeratedValue)MongoDbConnectorConfig.ConnectionMode.SHARDED);
    }

    @Test
    public void snapshotOnlyWithInt64() throws Exception {
        long firstKey = 0x80000000L;
        this.snapshotOnly(firstKey, k -> k + 1L);
    }

    private <K> void snapshotOnly(K initialId, Function<K, K> idGenerator) throws Exception {
        LinkedHashMap<K, Document> documents = new LinkedHashMap<K, Document>();
        K key = initialId;
        for (int i = 0; i < 1000; ++i) {
            Document doc = new Document();
            doc.append(DOCUMENT_ID, key).append(this.valueFieldName(), (Object)i);
            documents.put(key, doc);
            key = idGenerator.apply(key);
        }
        this.insertDocumentsInTx(DATABASE_NAME, COLLECTION_NAME, (Document[])documents.values().toArray(Document[]::new));
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        Map<String, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getString(this.pkFieldName()), this::extractFieldValue, this.topicName(), null);
        JsonSerialization serialization = new JsonSerialization();
        Map<String, Integer> expected = documents.values().stream().map(d -> d.toBsonDocument()).collect(Collectors.toMap(d -> serialization.getDocumentIdSnapshot(d), d -> d.getInt32((Object)this.valueFieldName()).getValue()));
        Assertions.assertThat(dbChanges).containsAllEntriesOf(expected);
    }

    protected String valueFieldName() {
        return "aa";
    }

    protected String pkFieldName() {
        return "id";
    }

    protected Class<MongoDbConnector> connectorClass() {
        return MongoDbConnector.class;
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig, DebeziumEngine.CompletionCallback callback) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(this.connectorClass(), config, callback);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(5L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
    }

    protected void startConnector() {
        this.startConnector(Function.identity(), (DebeziumEngine.CompletionCallback)this.loggingCompletion());
    }

    protected void waitForConnectorToStart() {
        this.assertConnectorIsRunning();
    }

    protected void sendAdHocSnapshotSignal() throws SQLException {
        this.sendAdHocSnapshotSignal(this.fullDataCollectionName());
    }

    protected void sendAdHocSnapshotSignal(String ... dataCollectionIds) throws SQLException {
        String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\\\"" + x + "\\\"").collect(Collectors.joining(", "));
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)("{\"type\": \"execute-snapshot\", \"payload\": \"{\\\"data-collections\\\": [" + dataCollectionIdsList + "]}\"}")));
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, this::extractFieldValue, x -> true, null, topicName);
    }

    protected Integer extractFieldValue(SourceRecord record) {
        String after = ((Struct)record.value()).getString("after");
        Pattern p = Pattern.compile("\"" + this.valueFieldName() + "\": (\\d+)");
        Matcher m = p.matcher(after);
        m.find();
        return Integer.parseInt(m.group(1));
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Function<SourceRecord, V> valueConverter, Predicate<Map.Entry<Integer, V>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer, String topicName) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> Integer.parseInt(k.getString(this.pkFieldName())), valueConverter, topicName, recordConsumer);
    }

    protected <V, K> Map<K, V> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<K, V>> dataCompleted, Function<Struct, K> idCalculator, Function<SourceRecord, V> valueConverter, String topicName, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        HashMap dbChanges = new HashMap();
        int noRecords = 0;
        while (true) {
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List dataRecords = records.recordsForTopic(topicName);
            if (records.allRecordsInOrder().isEmpty()) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)(++noRecords)).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount), new Object[0])).isLessThanOrEqualTo(3);
                continue;
            }
            noRecords = 0;
            if (dataRecords == null || dataRecords.isEmpty()) continue;
            dataRecords.forEach(record -> {
                Object id = idCalculator.apply((Struct)record.key());
                Object value = valueConverter.apply((SourceRecord)record);
                dbChanges.put(id, value);
            });
            if (recordConsumer != null) {
                recordConsumer.accept(dataRecords);
            }
            if (dbChanges.size() >= recordCount && !dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) break;
        }
        Assertions.assertThat(dbChanges).hasSize(recordCount);
        return dbChanges;
    }
}

