/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Cassandra4CommitLogReadHandlerImpl;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogProcessorMetrics;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.EmbeddedCassandra4ConnectorTestBase;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.TestUtils;
import io.debezium.util.Testing;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractCommitLogProcessorTest
extends EmbeddedCassandra4ConnectorTestBase {
    public CassandraConnectorContext context;
    protected CommitLogProcessorMetrics metrics = new CommitLogProcessorMetrics();
    private CommitLogReadHandler commitLogReadHandler;

    @Before
    public void setUp() throws Exception {
        this.initialiseData();
        this.context = this.generateTaskContext();
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> this.context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable("test_keyspace", TestUtils.TEST_TABLE_NAME)) != null);
        this.commitLogReadHandler = new Cassandra4CommitLogReadHandlerImpl(this.context, this.metrics);
        this.metrics.registerMetrics();
    }

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteTestOffsets((CassandraConnectorContext)this.context);
        this.metrics.unregisterMetrics();
        TestUtils.deleteTestKeyspaceTables();
        this.context.cleanUp();
        Testing.Files.delete((String)DatabaseDescriptor.getCDCLogLocation());
    }

    @Test
    public void test() throws Exception {
        this.verifyEvents();
    }

    public abstract void initialiseData() throws Exception;

    public abstract void verifyEvents() throws Exception;

    public void createTable(String query) throws Exception {
        this.createTable(query, "test_keyspace", TestUtils.TEST_TABLE_NAME);
    }

    public void createTable2(String query) throws Exception {
        this.createTable(query, "test_keyspace", TestUtils.TEST_TABLE_NAME_2);
    }

    public void createTable(String query, String keyspace, String tableName) throws Exception {
        TestUtils.runCql((String)String.format(query, keyspace, tableName));
    }

    public List<Event> getEvents(int expectedSize) throws Exception {
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        ArrayList<Event> events = new ArrayList<Event>();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            this.readLogs((ChangeEventQueue<Event>)queue);
            events.clear();
            events.addAll(queue.poll());
            return events.size() == expectedSize;
        });
        Assert.assertEquals((long)expectedSize, (long)events.size());
        return events;
    }

    private void readLogs(ChangeEventQueue<Event> queue) throws Exception {
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
        File cdcLoc = new File(DatabaseDescriptor.getCommitLogLocation());
        File[] commitLogs = CommitLogUtil.getCommitLogs((File)cdcLoc);
        CommitLogReader reader = new CommitLogReader();
        for (File commitLog : commitLogs) {
            reader.readCommitLogSegment(this.commitLogReadHandler, commitLog, true);
        }
    }
}

