/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Cassandra3TypeProvider;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.EmbeddedCassandra3ConnectorTestBase;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.SnapshotProcessor;
import io.debezium.connector.cassandra.TestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SnapshotProcessorTest
extends EmbeddedCassandra3ConnectorTestBase {
    @Test
    public void testSnapshotTable() throws Exception {
        CassandraConnectorContext context = this.generateTaskContext();
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context, new Cassandra3TypeProvider().getClusterName()));
        Mockito.when((Object)snapshotProcessor.isRunning()).thenReturn((Object)true);
        int tableSize = 5;
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable((String)"cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable((String)"cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        for (int i = 0; i < tableSize; ++i) {
            context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable((String)"cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{i, String.valueOf(i)});
            context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable((String)"cdc_table2") + "(a, b) VALUES (?, ?)", new Object[]{i + 10, String.valueOf(i + 10)});
        }
        ChangeEventQueue queue = (ChangeEventQueue)context.getQueues().get(0);
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals((long)(2 * tableSize), (long)(queue.totalCapacity() - queue.remainingCapacity()));
        ArrayList<ChangeRecord> table1 = new ArrayList<ChangeRecord>();
        ArrayList<ChangeRecord> table2 = new ArrayList<ChangeRecord>();
        for (Event event : queue.poll()) {
            ChangeRecord record = (ChangeRecord)event;
            Assert.assertEquals((Object)record.getEventType(), (Object)Event.EventType.CHANGE_EVENT);
            Assert.assertEquals((Object)record.getOp(), (Object)Record.Operation.INSERT);
            Assert.assertEquals((Object)record.getSource().cluster, (Object)DatabaseDescriptor.getClusterName());
            Assert.assertTrue((boolean)record.getSource().snapshot);
            String tableName = record.getSource().keyspaceTable.name();
            if (tableName.equals(TestUtils.keyspaceTable((String)"cdc_table"))) {
                table1.add(record);
            } else {
                table2.add(record);
            }
            Assert.assertEquals((Object)record.getSource().offsetPosition, (Object)OffsetPosition.defaultOffsetPosition());
        }
        Assert.assertEquals((long)tableSize, (long)table1.size());
        Assert.assertEquals((long)tableSize, (long)table2.size());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets((CassandraConnectorContext)context);
        context.cleanUp();
    }

    @Test
    public void testSnapshotSkipsNonCdcEnabledTable() throws Exception {
        CassandraConnectorContext context = this.generateTaskContext();
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context, new Cassandra3TypeProvider().getClusterName()));
        Mockito.when((Object)snapshotProcessor.isRunning()).thenReturn((Object)true);
        int tableSize = 5;
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable((String)"non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;");
        for (int i = 0; i < tableSize; ++i) {
            context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable((String)"non_cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{i, String.valueOf(i)});
        }
        ChangeEventQueue queue = (ChangeEventQueue)context.getQueues().get(0);
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets((CassandraConnectorContext)context);
        context.cleanUp();
    }

    @Test
    public void testSnapshotEmptyTable() throws Exception {
        CassandraConnectorContext context = this.generateTaskContext();
        AtomicBoolean globalTaskState = new AtomicBoolean(true);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context, new Cassandra3TypeProvider().getClusterName()));
        Mockito.when((Object)snapshotProcessor.isRunning()).thenReturn((Object)true);
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable((String)"cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        ChangeEventQueue queue = (ChangeEventQueue)context.getQueues().get(0);
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        int tableSize = 5;
        for (int i = 0; i < tableSize; ++i) {
            context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable((String)"cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{i, String.valueOf(i)});
        }
        snapshotProcessor.process();
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets((CassandraConnectorContext)context);
        globalTaskState.set(false);
        context.cleanUp();
    }

    @Test
    public void testSnapshotModeAlways() throws Exception {
        HashMap configs = TestUtils.propertiesForContext();
        configs.put("kafka.producer.bootstrap.servers", "localhost:9092");
        configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "always");
        configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        CassandraConnectorContext context = this.generateTaskContext(configs);
        SnapshotProcessor snapshotProcessorSpy = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context, new Cassandra3TypeProvider().getClusterName()));
        ((SnapshotProcessor)Mockito.doNothing().when((Object)snapshotProcessorSpy)).snapshot();
        for (int i = 0; i < 5; ++i) {
            snapshotProcessorSpy.process();
        }
        ((SnapshotProcessor)Mockito.verify((Object)snapshotProcessorSpy, (VerificationMode)Mockito.times((int)5))).snapshot();
        context.cleanUp();
    }

    @Test
    public void testSnapshotModeInitial() throws Exception {
        HashMap configs = TestUtils.propertiesForContext();
        configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "initial");
        configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        CassandraConnectorContext context = this.generateTaskContext(configs);
        SnapshotProcessor snapshotProcessorSpy = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context, new Cassandra3TypeProvider().getClusterName()));
        ((SnapshotProcessor)Mockito.doNothing().when((Object)snapshotProcessorSpy)).snapshot();
        for (int i = 0; i < 5; ++i) {
            snapshotProcessorSpy.process();
        }
        ((SnapshotProcessor)Mockito.verify((Object)snapshotProcessorSpy, (VerificationMode)Mockito.times((int)1))).snapshot();
        context.cleanUp();
    }

    @Test
    public void testSnapshotModeNever() throws Exception {
        HashMap configs = TestUtils.propertiesForContext();
        configs.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "never");
        configs.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        CassandraConnectorContext context = this.generateTaskContext(configs);
        SnapshotProcessor snapshotProcessorSpy = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context, new Cassandra3TypeProvider().getClusterName()));
        ((SnapshotProcessor)Mockito.doNothing().when((Object)snapshotProcessorSpy)).snapshot();
        for (int i = 0; i < 5; ++i) {
            snapshotProcessorSpy.process();
        }
        ((SnapshotProcessor)Mockito.verify((Object)snapshotProcessorSpy, (VerificationMode)Mockito.never())).snapshot();
        context.cleanUp();
    }
}

