/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Cassandra3CommitLogProcessor;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.EmbeddedCassandra3ConnectorTestBase;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.TestUtils;
import java.io.File;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SimpleBuilders;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CommitLogProcessorTest
extends EmbeddedCassandra3ConnectorTestBase {
    private CassandraConnectorContext context;
    private Cassandra3CommitLogProcessor commitLogProcessor;

    @Before
    public void setUp() throws Exception {
        this.context = this.generateTaskContext();
        this.commitLogProcessor = new Cassandra3CommitLogProcessor(this.context);
        this.commitLogProcessor.initialize();
    }

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteTestOffsets((CassandraConnectorContext)this.context);
        this.commitLogProcessor.destroy();
        TestUtils.deleteTestKeyspaceTables();
        this.context.cleanUp();
    }

    @Test
    public void testProcessCommitLogs() throws Exception {
        PartitionUpdate commitLogs;
        int commitLogRowSize = 10;
        Thread.sleep(10000L);
        TestUtils.runCql((String)("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable((String)"cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;"));
        Awaitility.await().forever().until((Callable)new Callable<Boolean>(){

            @Override
            public Boolean call() throws Exception {
                return CommitLogProcessorTest.this.context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable("test_keyspace", "cdc_table")) != null;
            }
        });
        CFMetaData cfMetaData = Schema.instance.getCFMetaData("test_keyspace", "cdc_table");
        for (int i = 0; i < commitLogRowSize; ++i) {
            SimpleBuilders.PartitionUpdateBuilder puBuilder = new SimpleBuilders.PartitionUpdateBuilder(cfMetaData, new Object[]{i});
            Row row = puBuilder.row(new Object[0]).add("b", (Object)i).build();
            PartitionUpdate pu = PartitionUpdate.singleRowUpdate((CFMetaData)cfMetaData, (DecoratedKey)puBuilder.build().partitionKey(), (Row)row);
            Mutation m = new Mutation(pu);
            CommitLog.instance.add(m);
        }
        CommitLog.instance.sync(true);
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        File cdcLoc = new File(DatabaseDescriptor.getCommitLogLocation());
        for (PartitionUpdate commitLog : commitLogs = CommitLogUtil.getCommitLogs((File)cdcLoc)) {
            this.commitLogProcessor.processCommitLog((File)commitLog);
        }
        List events = queue.poll();
        int eofEventSize = ((PartitionUpdate)commitLogs).length;
        Assert.assertEquals((long)(commitLogRowSize + eofEventSize), (long)events.size());
        for (int i = 0; i < events.size(); ++i) {
            Event event = (Event)events.get(i);
            if (event instanceof Record) {
                Record record = (Record)events.get(i);
                Assert.assertEquals((Object)record.getEventType(), (Object)Event.EventType.CHANGE_EVENT);
                Assert.assertEquals((Object)record.getSource().cluster, (Object)DatabaseDescriptor.getClusterName());
                Assert.assertFalse((boolean)record.getSource().snapshot);
                Assert.assertEquals((Object)record.getSource().keyspaceTable.name(), (Object)TestUtils.keyspaceTable((String)"cdc_table"));
                Assert.assertTrue((boolean)record.getSource().offsetPosition.fileName.contains(String.valueOf(CommitLog.instance.getCurrentPosition().segmentId)));
                continue;
            }
            if (event instanceof EOFEvent) {
                EOFEvent eofEvent = (EOFEvent)event;
                Assert.assertFalse((boolean)this.context.getErroneousCommitLogs().contains(eofEvent.file.getName()));
                continue;
            }
            throw new Exception("unexpected event type");
        }
    }
}

