/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.cassandra.BlockingEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogProcessor;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.EmbeddedCassandraConnectorTestBase;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Record;
import java.io.File;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SimpleBuilders;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CommitLogProcessorTest
extends EmbeddedCassandraConnectorTestBase {
    private CassandraConnectorContext context;
    private CommitLogProcessor commitLogProcessor;

    @Before
    public void setUp() throws Exception {
        this.context = CommitLogProcessorTest.generateTaskContext();
        this.commitLogProcessor = new CommitLogProcessor(this.context);
        this.commitLogProcessor.initialize();
    }

    @After
    public void tearDown() throws Exception {
        CommitLogProcessorTest.deleteTestOffsets(this.context);
        this.commitLogProcessor.destroy();
        this.context.cleanUp();
    }

    @Test
    public void testProcessCommitLogs() throws Exception {
        PartitionUpdate commitLogs;
        int commitLogRowSize = 10;
        this.context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + CommitLogProcessorTest.keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;");
        this.context.getSchemaHolder().refreshSchemas();
        CFMetaData cfMetaData = Schema.instance.getCFMetaData("test_keyspace", "cdc_table");
        for (int i = 0; i < commitLogRowSize; ++i) {
            SimpleBuilders.PartitionUpdateBuilder puBuilder = new SimpleBuilders.PartitionUpdateBuilder(cfMetaData, new Object[]{i});
            Row row = puBuilder.row(new Object[0]).add("b", (Object)i).build();
            PartitionUpdate pu = PartitionUpdate.singleRowUpdate((CFMetaData)cfMetaData, (DecoratedKey)puBuilder.build().partitionKey(), (Row)row);
            Mutation m = new Mutation(pu);
            CommitLog.instance.add(m);
        }
        CommitLog.instance.sync(true);
        BlockingEventQueue queue = this.context.getQueue();
        Assert.assertTrue((boolean)queue.isEmpty());
        File cdcLoc = new File(DatabaseDescriptor.getCommitLogLocation());
        for (PartitionUpdate commitLog : commitLogs = CommitLogUtil.getCommitLogs((File)cdcLoc)) {
            this.commitLogProcessor.processCommitLog((File)commitLog);
        }
        List events = queue.poll();
        int eofEventSize = ((PartitionUpdate)commitLogs).length;
        Assert.assertEquals((long)(commitLogRowSize + eofEventSize), (long)events.size());
        for (int i = 0; i < events.size(); ++i) {
            Event event = (Event)events.get(i);
            if (event instanceof Record) {
                Record record = (Record)events.get(i);
                Assert.assertEquals((Object)record.getEventType(), (Object)Event.EventType.CHANGE_EVENT);
                Assert.assertEquals((Object)record.getSource().cluster, (Object)DatabaseDescriptor.getClusterName());
                Assert.assertFalse((boolean)record.getSource().snapshot);
                Assert.assertEquals((Object)record.getSource().keyspaceTable.name(), (Object)CommitLogProcessorTest.keyspaceTable("cdc_table"));
                Assert.assertTrue((boolean)record.getSource().offsetPosition.fileName.contains(String.valueOf(CommitLog.instance.getCurrentPosition().segmentId)));
                continue;
            }
            if (event instanceof EOFEvent) {
                EOFEvent eofEvent = (EOFEvent)event;
                Assert.assertTrue((boolean)eofEvent.success);
                continue;
            }
            throw new Exception("unexpected event type");
        }
        CommitLogProcessorTest.deleteTestKeyspaceTables();
    }
}

