/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.ChangeRecord;
import io.debezium.connector.cassandra.FileOffsetWriter;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetPosition;
import io.debezium.connector.cassandra.OffsetWriter;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.SourceInfo;
import io.debezium.connector.cassandra.TestUtils;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.time.Conversions;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.connect.data.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FileOffsetWriterTest {
    private Path offsetDir;
    private OffsetWriter offsetWriter;
    private Properties snapshotProps;
    private Properties commitLogProps;
    private CassandraSchemaFactory schemaFactory = CassandraSchemaFactory.get();
    private CassandraConnectorConfig config;

    @Before
    public void setUp() throws IOException {
        this.config = new CassandraConnectorConfig(Configuration.from((Properties)TestUtils.generateDefaultConfigMap()));
        this.offsetDir = Path.of(this.config.offsetBackingStoreDir(), new String[0]);
        this.offsetWriter = new FileOffsetWriter(this.config);
        this.snapshotProps = new Properties();
        this.commitLogProps = new Properties();
    }

    @Test
    public void testMarkOffset() {
        ChangeRecord snapshotRecord = this.generateRecord(true, true, new OffsetPosition("", -1), new KeyspaceTable("test_keyspace", "test_table"));
        ChangeRecord commitLogRecord = this.generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable("test_keyspace", "test_table"));
        ChangeRecord commitLogRecordDupe = this.generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable("test_keyspace", "test_table"));
        ChangeRecord commitLogRecordOlderLog = this.generateRecord(true, false, new OffsetPosition("CommitLog-6-12344.log", 101), new KeyspaceTable("test_keyspace", "test_table"));
        ChangeRecord commitLogRecordDiffTable = this.generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable("test_keyspace", "test_another_table"));
        Assert.assertFalse((boolean)this.isProcessed(snapshotRecord));
        this.process(snapshotRecord);
        Assert.assertTrue((boolean)this.isProcessed(snapshotRecord));
        Assert.assertFalse((boolean)this.isProcessed(commitLogRecord));
        this.process(commitLogRecord);
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecord));
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecordDupe));
        this.process(commitLogRecordDupe);
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecordDupe));
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecordOlderLog));
        this.process(commitLogRecordOlderLog);
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecordOlderLog));
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecord));
        Assert.assertFalse((boolean)this.isProcessed(commitLogRecordDiffTable));
        this.process(commitLogRecordDiffTable);
        Assert.assertTrue((boolean)this.isProcessed(commitLogRecordDiffTable));
    }

    @Test
    public void testFlush() throws IOException {
        this.offsetWriter.flush();
        try (FileInputStream fis = new FileInputStream(this.offsetDir.toString() + "/snapshot_offset.properties");){
            this.snapshotProps.load(fis);
        }
        fis = new FileInputStream(this.offsetDir.toString() + "/commitlog_offset.properties");
        try {
            this.commitLogProps.load(fis);
        }
        finally {
            fis.close();
        }
        Assert.assertEquals((long)0L, (long)this.snapshotProps.size());
        Assert.assertEquals((long)0L, (long)this.commitLogProps.size());
        ChangeRecord snapshotRecord = this.generateRecord(true, true, new OffsetPosition("", -1), new KeyspaceTable("test_keyspace", "test_table"));
        ChangeRecord commitLogRecord = this.generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable("test_keyspace", "test_table"));
        ChangeRecord commitLogRecordDiffTable = this.generateRecord(true, false, new OffsetPosition("CommitLog-6-12345.log", 100), new KeyspaceTable("test_keyspace", "test_another_table"));
        this.process(snapshotRecord);
        this.process(commitLogRecord);
        this.process(commitLogRecordDiffTable);
        try {
            this.offsetWriter.flush().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
        try (FileInputStream fis = new FileInputStream(this.offsetDir.toString() + "/snapshot_offset.properties");){
            this.snapshotProps.load(fis);
        }
        fis = new FileInputStream(this.offsetDir.toString() + "/commitlog_offset.properties");
        try {
            this.commitLogProps.load(fis);
        }
        finally {
            fis.close();
        }
        Assert.assertEquals((long)1L, (long)this.snapshotProps.size());
        Assert.assertEquals((long)2L, (long)this.commitLogProps.size());
        Assert.assertEquals((Object)OffsetPosition.defaultOffsetPosition().serialize(), (Object)this.snapshotProps.getProperty(new KeyspaceTable("test_keyspace", "test_table").name()));
        Assert.assertEquals((Object)new OffsetPosition("CommitLog-6-12345.log", 100).serialize(), (Object)this.commitLogProps.getProperty(new KeyspaceTable("test_keyspace", "test_table").name()));
        Assert.assertEquals((Object)new OffsetPosition("CommitLog-6-12345.log", 100).serialize(), (Object)this.commitLogProps.getProperty(new KeyspaceTable("test_keyspace", "test_another_table").name()));
    }

    @Test(expected=CassandraConnectorTaskException.class)
    public void testTwoFileWriterCannotCoexist() throws IOException {
        new FileOffsetWriter(this.config);
    }

    private ChangeRecord generateRecord(boolean markOffset, boolean isSnapshot, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
        Configuration configuration = ((Configuration.Builder)Configuration.empty().edit().with(CassandraConnectorConfig.TOPIC_PREFIX, "someconnector")).build();
        CassandraConnectorConfig config = new CassandraConnectorConfig(configuration);
        SourceInfo sourceInfo = new SourceInfo((CommonConnectorConfig)config, "test-cluster", offsetPosition, keyspaceTable, isSnapshot, Conversions.toInstantFromMicros((long)(System.currentTimeMillis() * 1000L)));
        return new ChangeRecord(sourceInfo, this.schemaFactory.rowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, markOffset);
    }

    private boolean isProcessed(ChangeRecord record) {
        return this.offsetWriter.isOffsetProcessed(record.getSource().keyspaceTable.name(), record.getSource().offsetPosition.serialize(), record.getSource().snapshot);
    }

    private void process(ChangeRecord record) {
        try {
            this.offsetWriter.markOffset(record.getSource().keyspaceTable.name(), record.getSource().offsetPosition.serialize(), record.getSource().snapshot).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

