/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.cassandra.BlockingEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.EmbeddedCassandraConnectorTestBase;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.SnapshotProcessor;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SnapshotProcessorTest
extends EmbeddedCassandraConnectorTestBase {
    @Test
    public void testSnapshotTable() throws Exception {
        CassandraConnectorContext context = SnapshotProcessorTest.generateTaskContext();
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context));
        Mockito.when((Object)snapshotProcessor.isRunning()).thenReturn((Object)true);
        int tableSize = 5;
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + SnapshotProcessorTest.keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        context.getSchemaHolder().refreshSchemas();
        for (int i = 0; i < tableSize; ++i) {
            context.getCassandraClient().execute("INSERT INTO " + SnapshotProcessorTest.keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{i, String.valueOf(i)});
        }
        BlockingEventQueue queue = context.getQueue();
        Assert.assertTrue((boolean)queue.isEmpty());
        snapshotProcessor.process();
        Assert.assertEquals((long)tableSize, (long)queue.size());
        for (Event event : queue.poll()) {
            ChangeRecord record = (ChangeRecord)event;
            Assert.assertEquals((Object)record.getEventType(), (Object)Event.EventType.CHANGE_EVENT);
            Assert.assertEquals((Object)record.getOp(), (Object)Record.Operation.INSERT);
            Assert.assertEquals((Object)record.getSource().cluster, (Object)DatabaseDescriptor.getClusterName());
            Assert.assertTrue((boolean)record.getSource().snapshot);
            Assert.assertEquals((Object)record.getSource().keyspaceTable.name(), (Object)SnapshotProcessorTest.keyspaceTable("cdc_table"));
            Assert.assertEquals((Object)record.getSource().offsetPosition, (Object)OffsetPosition.defaultOffsetPosition());
        }
        SnapshotProcessorTest.deleteTestKeyspaceTables();
        SnapshotProcessorTest.deleteTestOffsets(context);
        context.cleanUp();
    }

    @Test
    public void testSnapshotSkipsNonCdcEnabledTable() throws Exception {
        CassandraConnectorContext context = SnapshotProcessorTest.generateTaskContext();
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context));
        Mockito.when((Object)snapshotProcessor.isRunning()).thenReturn((Object)true);
        int tableSize = 5;
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + SnapshotProcessorTest.keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;");
        context.getSchemaHolder().refreshSchemas();
        for (int i = 0; i < tableSize; ++i) {
            context.getCassandraClient().execute("INSERT INTO " + SnapshotProcessorTest.keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{i, String.valueOf(i)});
        }
        BlockingEventQueue queue = context.getQueue();
        Assert.assertTrue((boolean)queue.isEmpty());
        snapshotProcessor.process();
        Assert.assertTrue((boolean)queue.isEmpty());
        SnapshotProcessorTest.deleteTestKeyspaceTables();
        SnapshotProcessorTest.deleteTestOffsets(context);
        context.cleanUp();
    }

    @Test
    public void testSnapshotEmptyTable() throws Exception {
        CassandraConnectorContext context = SnapshotProcessorTest.generateTaskContext();
        AtomicBoolean globalTaskState = new AtomicBoolean(true);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context));
        Mockito.when((Object)snapshotProcessor.isRunning()).thenReturn((Object)true);
        context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + SnapshotProcessorTest.keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        context.getSchemaHolder().refreshSchemas();
        BlockingEventQueue queue = context.getQueue();
        Assert.assertTrue((boolean)queue.isEmpty());
        snapshotProcessor.process();
        Assert.assertTrue((boolean)queue.isEmpty());
        int tableSize = 5;
        for (int i = 0; i < tableSize; ++i) {
            context.getCassandraClient().execute("INSERT INTO " + SnapshotProcessorTest.keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{i, String.valueOf(i)});
        }
        snapshotProcessor.process();
        Assert.assertTrue((boolean)queue.isEmpty());
        SnapshotProcessorTest.deleteTestKeyspaceTables();
        SnapshotProcessorTest.deleteTestOffsets(context);
        globalTaskState.set(false);
        context.cleanUp();
    }

    @Test
    public void testSnapshotModeAlways() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("snapshot.mode", "always");
        configs.put("snapshot.scan.interval.ms", "0");
        CassandraConnectorContext context = SnapshotProcessorTest.generateTaskContext(configs);
        SnapshotProcessor snapshotProcessorSpy = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context));
        ((SnapshotProcessor)Mockito.doNothing().when((Object)snapshotProcessorSpy)).snapshot();
        for (int i = 0; i < 5; ++i) {
            snapshotProcessorSpy.process();
        }
        ((SnapshotProcessor)Mockito.verify((Object)snapshotProcessorSpy, (VerificationMode)Mockito.times((int)5))).snapshot();
        context.cleanUp();
    }

    @Test
    public void testSnapshotModeInitial() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("snapshot.mode", "initial");
        configs.put("snapshot.scan.interval.ms", "0");
        CassandraConnectorContext context = SnapshotProcessorTest.generateTaskContext(configs);
        SnapshotProcessor snapshotProcessorSpy = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context));
        ((SnapshotProcessor)Mockito.doNothing().when((Object)snapshotProcessorSpy)).snapshot();
        for (int i = 0; i < 5; ++i) {
            snapshotProcessorSpy.process();
        }
        ((SnapshotProcessor)Mockito.verify((Object)snapshotProcessorSpy, (VerificationMode)Mockito.times((int)1))).snapshot();
        context.cleanUp();
    }

    @Test
    public void testSnapshotModeNever() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("snapshot.mode", "never");
        configs.put("snapshot.scan.interval.ms", "0");
        CassandraConnectorContext context = SnapshotProcessorTest.generateTaskContext(configs);
        SnapshotProcessor snapshotProcessorSpy = (SnapshotProcessor)Mockito.spy((Object)new SnapshotProcessor(context));
        ((SnapshotProcessor)Mockito.doNothing().when((Object)snapshotProcessorSpy)).snapshot();
        for (int i = 0; i < 5; ++i) {
            snapshotProcessorSpy.process();
        }
        ((SnapshotProcessor)Mockito.verify((Object)snapshotProcessorSpy, (VerificationMode)Mockito.never())).snapshot();
        context.cleanUp();
    }
}

