/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.term.Term;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.AbstractCommitLogProcessorTest;
import io.debezium.connector.cassandra.Cassandra4CommitLogSegmentReader;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.CommitLogIdxProcessor;
import io.debezium.connector.cassandra.CommitLogSegmentReader;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.TestUtils;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogRealTimeParserTest
extends AbstractCommitLogProcessorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommitLogRealTimeParserTest.class);
    private CommitLogIdxProcessor commitLogProcessor;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.commitLogProcessor = new CommitLogIdxProcessor(this.context, this.metrics, (CommitLogSegmentReader)new Cassandra4CommitLogSegmentReader(this.context, this.metrics), new File(DatabaseDescriptor.getCDCLogLocation()));
        this.readLogs();
    }

    public CassandraConnectorContext generateTaskContext() throws Exception {
        Properties properties = TestUtils.generateDefaultConfigMap();
        properties.put(CassandraConnectorConfig.COMMIT_LOG_REAL_TIME_PROCESSING_ENABLED.name(), "true");
        properties.put(CassandraConnectorConfig.COMMIT_LOG_MARKED_COMPLETE_POLL_INTERVAL_IN_MS.name(), "1000");
        return this.generateTaskContext(Configuration.from((Properties)properties));
    }

    @Override
    public void initialiseData() throws Exception {
        this.createTable("CREATE TABLE IF NOT EXISTS %s.%s (a int, b int, PRIMARY KEY(a)) WITH cdc = true;");
        this.insertRows(3, 10);
    }

    private void insertRows(int count, int keyInc) {
        for (int i = 0; i < count; ++i) {
            TestUtils.runCql((SimpleStatement)QueryBuilder.insertInto((String)"test_keyspace", (String)TestUtils.TEST_TABLE_NAME).value("a", (Term)QueryBuilder.literal((Object)(i + keyInc))).value("b", (Term)QueryBuilder.literal((Object)i)).build());
        }
        LOGGER.info("Inserted rows: {}", (Object)count);
    }

    @Override
    public void verifyEvents() {
        this.verify(3, 10);
        this.insertRows(2, 20);
        this.verify(2, 20);
    }

    private void verify(int expectedEventsCount, int keyInc) {
        ArrayList events = new ArrayList();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            events.addAll(((ChangeEventQueue)this.context.getQueues().get(0)).poll());
            return events.size() == expectedEventsCount;
        });
        LOGGER.info("Total events received: {}", (Object)events.size());
        Assert.assertEquals((String)("Total number of events received must be " + expectedEventsCount), (long)expectedEventsCount, (long)events.size());
        for (int i = 0; i < expectedEventsCount; ++i) {
            Record record = (Record)events.get(i);
            Record.Operation op = record.getOp();
            Assert.assertEquals((String)("Operation type must be insert but it was " + op), (Object)Record.Operation.INSERT, (Object)op);
            Assert.assertEquals((String)("Inserted key should be " + i + keyInc), (Object)((CassandraSchemaFactory.CellData)record.getRowData().getPrimary().get((int)0)).value, (Object)(i + keyInc));
        }
    }

    private void readLogs() {
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        String cdcLoc = DatabaseDescriptor.getCDCLogLocation();
        LOGGER.info("CDC Location: {}", (Object)cdcLoc);
        Awaitility.await().timeout(Duration.ofSeconds(3L)).until(() -> CommitLogUtil.getIndexes((File)new File(cdcLoc)).length >= 1);
        File[] commitLogIndexes = CommitLogUtil.getIndexes((File)new File(cdcLoc));
        Arrays.sort(commitLogIndexes, (file1, file2) -> CommitLogUtil.compareCommitLogsIndexes((File)file1, (File)file2));
        Assert.assertTrue((String)"At least one idx file must be generated", (commitLogIndexes.length >= 1 ? 1 : 0) != 0);
        this.commitLogProcessor.submit(commitLogIndexes[commitLogIndexes.length - 1].toPath());
    }
}

