/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.EmbeddedCassandra3ConnectorTestBase;
import io.debezium.connector.cassandra.KafkaRecordEmitter;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.QueueProcessor;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.RowData;
import io.debezium.connector.cassandra.SourceInfo;
import io.debezium.connector.cassandra.TestUtils;
import io.debezium.connector.cassandra.TombstoneRecord;
import io.debezium.time.Conversions;
import java.io.File;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class QueueProcessorTest
extends EmbeddedCassandra3ConnectorTestBase {
    private CassandraConnectorContext context;
    private QueueProcessor queueProcessor;
    private KafkaRecordEmitter emitter;

    @Before
    public void setUp() throws Exception {
        this.context = this.generateTaskContext();
        this.emitter = (KafkaRecordEmitter)Mockito.mock(KafkaRecordEmitter.class);
        this.queueProcessor = new QueueProcessor(this.context, 0, this.emitter);
    }

    @After
    public void tearDown() {
        this.context.cleanUp();
    }

    @Test
    public void testProcessChangeRecords() throws Exception {
        ((KafkaRecordEmitter)Mockito.doNothing().when((Object)this.emitter)).emit((Record)ArgumentMatchers.any());
        int recordSize = 5;
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        for (int i = 0; i < recordSize; ++i) {
            Configuration configuration = ((Configuration.Builder)Configuration.empty().edit().with(CassandraConnectorConfig.CONNECTOR_NAME, "someconnector")).build();
            CassandraConnectorConfig config = new CassandraConnectorConfig(configuration);
            SourceInfo sourceInfo = new SourceInfo((CommonConnectorConfig)config, DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable("test_keyspace", "cdc_table"), false, Conversions.toInstantFromMicros((long)(System.currentTimeMillis() * 1000L)));
            ChangeRecord record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false);
            queue.enqueue((Object)record);
        }
        Assert.assertEquals((long)recordSize, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        ((KafkaRecordEmitter)Mockito.verify((Object)this.emitter, (VerificationMode)Mockito.times((int)recordSize))).emit((Record)ArgumentMatchers.any());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }

    @Test
    public void testProcessTombstoneRecords() throws Exception {
        ((KafkaRecordEmitter)Mockito.doNothing().when((Object)this.emitter)).emit((Record)ArgumentMatchers.any());
        int recordSize = 5;
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        for (int i = 0; i < recordSize; ++i) {
            Configuration configuration = ((Configuration.Builder)Configuration.empty().edit().with(CassandraConnectorConfig.CONNECTOR_NAME, "someconnector")).build();
            CassandraConnectorConfig config = new CassandraConnectorConfig(configuration);
            SourceInfo sourceInfo = new SourceInfo((CommonConnectorConfig)config, DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable("test_keyspace", "cdc_table"), false, Conversions.toInstantFromMicros((long)(System.currentTimeMillis() * 1000L)));
            TombstoneRecord record = new TombstoneRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA);
            queue.enqueue((Object)record);
        }
        Assert.assertEquals((long)recordSize, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        ((KafkaRecordEmitter)Mockito.verify((Object)this.emitter, (VerificationMode)Mockito.times((int)recordSize))).emit((Record)ArgumentMatchers.any());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }

    @Test
    public void testProcessEofEvent() throws Exception {
        ((KafkaRecordEmitter)Mockito.doNothing().when((Object)this.emitter)).emit((Record)ArgumentMatchers.any());
        ChangeEventQueue queue = (ChangeEventQueue)this.context.getQueues().get(0);
        File commitLogFile = TestUtils.generateCommitLogFile();
        queue.enqueue((Object)new EOFEvent(commitLogFile));
        Assert.assertEquals((long)1L, (long)(queue.totalCapacity() - queue.remainingCapacity()));
        this.queueProcessor.process();
        ((KafkaRecordEmitter)Mockito.verify((Object)this.emitter, (VerificationMode)Mockito.times((int)0))).emit((Record)ArgumentMatchers.any());
        Assert.assertEquals((long)queue.totalCapacity(), (long)queue.remainingCapacity());
    }
}

