/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.cassandra.BlockingEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.EmbeddedCassandraConnectorTestBase;
import io.debezium.connector.cassandra.KafkaRecordEmitter;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.QueueProcessor;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.RowData;
import io.debezium.connector.cassandra.SourceInfo;
import io.debezium.connector.cassandra.TombstoneRecord;
import java.io.File;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class QueueProcessorTest
extends EmbeddedCassandraConnectorTestBase {
    private CassandraConnectorContext context;
    private QueueProcessor queueProcessor;
    private KafkaRecordEmitter emitter;

    @Before
    public void setUp() throws Exception {
        this.context = QueueProcessorTest.generateTaskContext();
        this.emitter = (KafkaRecordEmitter)Mockito.mock(KafkaRecordEmitter.class);
        this.queueProcessor = new QueueProcessor(this.context, this.emitter);
    }

    @After
    public void tearDown() {
        this.context.cleanUp();
    }

    @Test
    public void testProcessChangeRecords() throws Exception {
        ((KafkaRecordEmitter)Mockito.doNothing().when((Object)this.emitter)).emit((Record)ArgumentMatchers.any());
        int recordSize = 5;
        BlockingEventQueue queue = this.context.getQueue();
        for (int i = 0; i < recordSize; ++i) {
            SourceInfo sourceInfo = new SourceInfo(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable("test_keyspace", "cdc_table"), false, System.currentTimeMillis() * 1000L);
            ChangeRecord record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false);
            queue.enqueue((Object)record);
        }
        Assert.assertEquals((long)recordSize, (long)queue.size());
        this.queueProcessor.process();
        ((KafkaRecordEmitter)Mockito.verify((Object)this.emitter, (VerificationMode)Mockito.times((int)recordSize))).emit((Record)ArgumentMatchers.any());
        Assert.assertTrue((boolean)queue.isEmpty());
    }

    @Test
    public void testProcessTombstoneRecords() throws Exception {
        ((KafkaRecordEmitter)Mockito.doNothing().when((Object)this.emitter)).emit((Record)ArgumentMatchers.any());
        int recordSize = 5;
        BlockingEventQueue queue = this.context.getQueue();
        for (int i = 0; i < recordSize; ++i) {
            SourceInfo sourceInfo = new SourceInfo(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable("test_keyspace", "cdc_table"), false, System.currentTimeMillis() * 1000L);
            TombstoneRecord record = new TombstoneRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA);
            queue.enqueue((Object)record);
        }
        Assert.assertEquals((long)recordSize, (long)queue.size());
        this.queueProcessor.process();
        ((KafkaRecordEmitter)Mockito.verify((Object)this.emitter, (VerificationMode)Mockito.times((int)recordSize))).emit((Record)ArgumentMatchers.any());
        Assert.assertTrue((boolean)queue.isEmpty());
    }

    @Test
    public void testProcessEofEvent() throws Exception {
        ((KafkaRecordEmitter)Mockito.doNothing().when((Object)this.emitter)).emit((Record)ArgumentMatchers.any());
        BlockingEventQueue queue = this.context.getQueue();
        File commitLogFile = QueueProcessorTest.generateCommitLogFile();
        queue.enqueue((Object)new EOFEvent(commitLogFile, true));
        Assert.assertEquals((long)1L, (long)queue.size());
        this.queueProcessor.process();
        ((KafkaRecordEmitter)Mockito.verify((Object)this.emitter, (VerificationMode)Mockito.times((int)0))).emit((Record)ArgumentMatchers.any());
        Assert.assertTrue((boolean)queue.isEmpty());
    }
}

