/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Cassandra3CommitLogProcessor;
import io.debezium.connector.cassandra.Cassandra3CommitLogReadHandlerImpl;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogProcessorMetrics;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.EmbeddedCassandra3ConnectorTestBase;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Filters;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.RecordMaker;
import io.debezium.connector.cassandra.TestUtils;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractCommitLogProcessorTest
extends EmbeddedCassandra3ConnectorTestBase {
    public ChangeEventQueue<Event> queue;
    public CassandraConnectorContext context;
    public Cassandra3CommitLogProcessor commitLogProcessor;

    @Before
    public void setUp() throws Exception {
        this.initialiseData();
        this.context = this.generateTaskContext();
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> this.context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable("test_keyspace", TestUtils.TEST_TABLE_NAME)) != null);
        this.commitLogProcessor = new Cassandra3CommitLogProcessor(this.context);
        this.commitLogProcessor.initialize();
        this.queue = (ChangeEventQueue)this.context.getQueues().get(0);
        this.readLogs();
    }

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteTestOffsets((CassandraConnectorContext)this.context);
        this.commitLogProcessor.destroy();
        TestUtils.deleteTestKeyspaceTables();
        this.context.cleanUp();
    }

    @Test
    public void test() throws Exception {
        this.verifyEvents();
    }

    public abstract void initialiseData() throws Exception;

    public abstract void verifyEvents() throws Exception;

    public void createTable(String query) throws Exception {
        this.createTable(query, "test_keyspace", TestUtils.TEST_TABLE_NAME);
    }

    public void createTable2(String query) throws Exception {
        this.createTable(query, "test_keyspace", TestUtils.TEST_TABLE_NAME_2);
    }

    public void createTable(String query, String keyspace, String tableName) throws Exception {
        TestUtils.runCql((String)String.format(query, keyspace, tableName));
        Thread.sleep(2000L);
    }

    public List<Event> getEvents(int expectedSize) throws Exception {
        ArrayList<Event> events = new ArrayList<Event>();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            events.addAll(this.queue.poll());
            return events.size() >= expectedSize;
        });
        Assert.assertEquals((long)expectedSize, (long)events.size());
        return events;
    }

    public void readLogs() throws Exception {
        Assert.assertEquals((long)this.queue.totalCapacity(), (long)this.queue.remainingCapacity());
        File cdcLoc = new File(DatabaseDescriptor.getCommitLogLocation());
        File[] commitLogs = CommitLogUtil.getCommitLogs((File)cdcLoc);
        Cassandra3CommitLogReadHandlerImpl commitLogReadHandler = new Cassandra3CommitLogReadHandlerImpl(this.context.getSchemaHolder(), this.context.getQueues(), this.context.getOffsetWriter(), new RecordMaker(this.context.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(this.context.getCassandraConnectorConfig().fieldExcludeList()), this.context.getCassandraConnectorConfig()), new CommitLogProcessorMetrics());
        CommitLogReader reader = new CommitLogReader();
        for (File commitLog : commitLogs) {
            reader.readCommitLogSegment((CommitLogReadHandler)commitLogReadHandler, commitLog, true);
        }
    }
}

