/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import com.datastax.oss.driver.api.core.type.DataTypes;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CellData;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.Emitter;
import io.debezium.connector.cassandra.KeyValueSchema;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.QueueProcessor;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.RowData;
import io.debezium.connector.cassandra.SourceInfo;
import io.debezium.connector.cassandra.TestUtils;
import io.debezium.connector.cassandra.TestingKafkaRecordEmitter;
import io.debezium.connector.cassandra.TombstoneRecord;
import io.debezium.pipeline.Sizeable;
import io.debezium.time.Conversions;
import java.io.File;
import java.util.Arrays;
import java.util.Properties;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractQueueProcessorTest {
    private CassandraConnectorContext context;
    private QueueProcessor queueProcessor;
    private TestingKafkaRecordEmitter emitter;
    private KeyValueSchema keyValueSchema;
    private RowData rowData;
    private SourceInfo sourceInfo;

    public abstract CassandraConnectorContext generateTaskContext(Configuration var1);

    @Before
    public void setUp() throws Exception {
        this.context = this.generateTaskContext(Configuration.from((Properties)TestUtils.generateDefaultConfigMap()));
        this.emitter = new TestingKafkaRecordEmitter(this.context.getCassandraConnectorConfig(), null, this.context.getOffsetWriter(), this.context.getCassandraConnectorConfig().offsetFlushIntervalMs(), this.context.getCassandraConnectorConfig().maxOffsetFlushSize(), this.context.getCassandraConnectorConfig().getKeyConverter(), this.context.getCassandraConnectorConfig().getValueConverter(), this.context.getErroneousCommitLogs(), this.context.getCassandraConnectorConfig().getCommitLogTransfer());
        this.queueProcessor = new QueueProcessor(this.context, 0, (Emitter)this.emitter);
        this.keyValueSchema = new KeyValueSchema.KeyValueSchemaBuilder().withKeyspace("test_keyspace").withTable("cdc_table").withKafkaTopicPrefix(this.context.getCassandraConnectorConfig().getLogicalName()).withSourceInfoStructMarker(this.context.getCassandraConnectorConfig().getSourceInfoStructMaker()).withRowSchema(RowData.rowSchema(Arrays.asList("col1", "col2"), Arrays.asList(DataTypes.TEXT, DataTypes.INT))).withPrimaryKeyNames(Arrays.asList("p1", "c1")).withPrimaryKeySchemas(KeyValueSchema.getPrimaryKeySchemas(Arrays.asList(DataTypes.INT, DataTypes.INT))).build();
        this.rowData = new RowData();
        this.rowData.addCell(new CellData("p1", (Object)1, null, CellData.ColumnType.PARTITION));
        this.rowData.addCell(new CellData("c1", (Object)2, null, CellData.ColumnType.CLUSTERING));
        this.rowData.addCell(new CellData("col1", (Object)"col1value", null, CellData.ColumnType.REGULAR));
        this.rowData.addCell(new CellData("col2", (Object)3, null, CellData.ColumnType.REGULAR));
        this.sourceInfo = new SourceInfo((CommonConnectorConfig)this.context.getCassandraConnectorConfig(), "cluster1", new OffsetPosition("CommitLog-6-123.log", 0), new KeyspaceTable("test_keyspace", "cdc_table"), false, Conversions.toInstantFromMicros((long)(System.currentTimeMillis() * 1000L)));
    }

    @After
    public void tearDown() {
        this.context.cleanUp();
    }

    @Test
    public void testInsertChangeRecordProcessing() throws Exception {
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        ChangeRecord record = new ChangeRecord(this.sourceInfo, this.rowData, this.keyValueSchema.keySchema(), this.keyValueSchema.valueSchema(), Record.Operation.INSERT, false);
        queue.enqueue((Sizeable)record);
        Assert.assertEquals((long)1L, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        Assert.assertEquals((long)1L, (long)this.emitter.records.size());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }

    @Test
    public void testRangeTombstoneChangeRecordProcessing() throws Exception {
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        this.rowData.addStart((Object)"1");
        this.rowData.addEnd((Object)"2");
        ChangeRecord record = new ChangeRecord(this.sourceInfo, this.rowData, this.keyValueSchema.keySchema(), this.keyValueSchema.valueSchema(), Record.Operation.RANGE_TOMBSTONE, false);
        queue.enqueue((Sizeable)record);
        Assert.assertEquals((long)1L, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        Assert.assertEquals((long)1L, (long)this.emitter.records.size());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }

    @Test
    public void testProcessTombstoneRecords() throws Exception {
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        TombstoneRecord record = new TombstoneRecord(this.sourceInfo, this.rowData, this.keyValueSchema.keySchema());
        queue.enqueue((Sizeable)record);
        Assert.assertEquals((long)1L, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        Assert.assertEquals((long)1L, (long)this.emitter.records.size());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }

    @Test
    public void testProcessEofEvent() throws Exception {
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        File commitLogFile = new File("non-existing-log-file-path");
        queue.enqueue((Sizeable)new EOFEvent(commitLogFile));
        Assert.assertEquals((long)1L, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        Assert.assertEquals((long)0L, (long)this.emitter.records.size());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }
}

