/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.TransactionalBuffer;
import io.debezium.connector.oracle.logminer.TransactionalBufferMetrics;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.txmetadata.TransactionContext;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;

@SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
public class TransactionalBufferTest {
    private static final String SERVER_NAME = "serverX";
    private static final String TRANSACTION_ID = "transaction";
    private static final String OTHER_TRANSACTION_ID = "other_transaction";
    private static final String SQL_ONE = "update table";
    private static final String SQL_TWO = "insert into table";
    private static final String MESSAGE = "OK";
    private static final BigDecimal SCN = BigDecimal.ONE;
    private static final BigDecimal OTHER_SCN = BigDecimal.TEN;
    private static final BigDecimal LARGEST_SCN = BigDecimal.valueOf(100L);
    private static final Timestamp TIMESTAMP = new Timestamp(System.currentTimeMillis());
    private static final Configuration config = new Configuration(){

        public Set<String> keys() {
            return null;
        }

        public String getString(String key) {
            return null;
        }
    };
    private static final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
    private static OracleOffsetContext offsetContext;
    private ErrorHandler errorHandler;
    private TransactionalBuffer transactionalBuffer;
    private TransactionalBufferMetrics metrics;
    @Rule
    public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();

    @Before
    public void before() {
        ChangeEventQueue queue = new ChangeEventQueue.Builder().pollInterval(Duration.of(8192L, ChronoUnit.MILLIS)).maxBatchSize(2048).maxQueueSize(8192).build();
        this.errorHandler = new ErrorHandler(OracleConnector.class, SERVER_NAME, queue);
        this.metrics = (TransactionalBufferMetrics)Mockito.mock(TransactionalBufferMetrics.class);
        this.transactionalBuffer = new TransactionalBuffer(SERVER_NAME, this.errorHandler, this.metrics, 8192);
    }

    @After
    public void after() throws InterruptedException {
        this.transactionalBuffer.close();
    }

    @Test
    public void testIsEmpty() {
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
    }

    @Test
    public void testIsNotEmptyWhenTransactionIsRegistered() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
    }

    @Test
    public void testIsNotEmptyWhenTransactionIsCommitting() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> Thread.sleep(1000L));
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), (LcrPosition)null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
    }

    @Test
    public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException {
        CountDownLatch commitLatch = new CountDownLatch(1);
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown());
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), (LcrPosition)null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
        commitLatch.await();
        Thread.sleep(1000L);
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
    }

    @Test
    public void testIsEmptyWhenTransactionIsRolledBack() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)SCN);
    }

    @Test
    public void testNonEmptyFirstTransactionIsRolledBack() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isTrue();
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isFalse();
    }

    @Test
    public void testNonEmptySecondTransactionIsRolledBack() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isFalse();
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isTrue();
    }

    @Test
    public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException {
        CountDownLatch commitLatch = new CountDownLatch(1);
        AtomicReference smallestScnContainer = new AtomicReference();
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {
            smallestScnContainer.set(smallestScn);
            commitLatch.countDown();
        });
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)SCN);
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
        commitLatch.await();
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)SCN);
        Assertions.assertThat((BigDecimal)((BigDecimal)smallestScnContainer.get())).isNull();
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
    }

    @Test
    public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException {
        CountDownLatch commitLatch = new CountDownLatch(1);
        AtomicReference smallestScnContainer = new AtomicReference();
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {
            smallestScnContainer.set(smallestScn);
            commitLatch.countDown();
        });
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
        commitLatch.await();
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        Assertions.assertThat((BigDecimal)((BigDecimal)smallestScnContainer.get())).isEqualTo((Object)OTHER_SCN);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
    }

    @Test
    public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        CountDownLatch commitLatch = new CountDownLatch(1);
        AtomicReference smallestScnContainer = new AtomicReference();
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {
            smallestScnContainer.set(smallestScn);
            commitLatch.countDown();
        });
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), Long.valueOf(OTHER_SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
        commitLatch.await();
        Assertions.assertThat((BigDecimal)((BigDecimal)smallestScnContainer.get())).isEqualTo((Object)SCN);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
    }

    @Test
    public void testResetLargestScn() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), Long.valueOf(OTHER_SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN, offsetContext, TIMESTAMP, () -> true, MESSAGE);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
        this.transactionalBuffer.resetLargestScn(null);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)BigDecimal.ZERO);
        this.transactionalBuffer.resetLargestScn(Long.valueOf(OTHER_SCN.longValue()));
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
    }

    @Test
    public void testAbandoningOneTransaction() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.abandonLongTransactions(Long.valueOf(SCN.longValue()));
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)BigDecimal.ZERO);
    }

    @Test
    public void testAbandoningTransactionHavingAnotherOne() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), "", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.abandonLongTransactions(Long.valueOf(SCN.longValue()));
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
        Assertions.assertThat((BigDecimal)this.transactionalBuffer.getLargestScn()).isEqualTo((Object)OTHER_SCN);
    }

    @Test
    public void testTransactionDump() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), SQL_ONE, (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), SQL_ONE, (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), SQL_TWO, (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((String)this.transactionalBuffer.toString()).contains(SQL_ONE);
        Assertions.assertThat((String)this.transactionalBuffer.toString()).contains(SQL_TWO);
    }

    @Test
    public void testDuplicatedRedoSql() {
        Assertions.assertThat((boolean)this.transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO));
        String insertIntoATable = "insert into a table";
        String anotherInsertIntoATable = "another insert into a table";
        String duplicatedInsertIntoATable = "duplicated insert into a table";
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), "another insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((boolean)this.transactionalBuffer.getLargestScn().equals(OTHER_SCN));
        Assertions.assertThat((boolean)this.transactionalBuffer.toString().contains("insert into a table"));
        Assertions.assertThat((boolean)this.transactionalBuffer.toString().contains("another insert into a table"));
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "duplicated insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, SCN, Instant.now(), "duplicated insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((this.transactionalBuffer.toString().indexOf("duplicated insert into a table") != this.transactionalBuffer.toString().lastIndexOf("duplicated insert into a table") ? 1 : 0) != 0);
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        this.transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "duplicated insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), "duplicated insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((this.transactionalBuffer.toString().indexOf("duplicated insert into a table") == this.transactionalBuffer.toString().lastIndexOf("duplicated insert into a table") ? 1 : 0) != 0);
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "duplicated insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, OTHER_SCN, Instant.now(), "insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, LARGEST_SCN, Instant.now(), "duplicated insert into a table", (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((this.transactionalBuffer.toString().indexOf("duplicated insert into a table") != this.transactionalBuffer.toString().lastIndexOf("duplicated insert into a table") ? 1 : 0) != 0);
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
    }

    @Test
    public void testCommitQueueOverflowProcessedOnCaller() throws InterruptedException {
        Thread mainThread = Thread.currentThread();
        int commitQueueCapacity = 10;
        this.transactionalBuffer = new TransactionalBuffer(SERVER_NAME, this.errorHandler, this.metrics, commitQueueCapacity);
        int transactionToCommitCount = commitQueueCapacity + 1;
        CountDownLatch countDownLatch = new CountDownLatch(transactionToCommitCount + 1);
        for (int i = 0; i <= commitQueueCapacity; ++i) {
            this.commitTransaction((timestamp, smallestScn, commitScn, counter) -> {
                TestCase.assertNotSame((Object)mainThread, (Object)Thread.currentThread());
                TimeUnit.MILLISECONDS.sleep(100L);
                countDownLatch.countDown();
            });
        }
        this.commitTransaction((timestamp, smallestScn, commitScn, counter) -> {
            TestCase.assertSame((Object)mainThread, (Object)Thread.currentThread());
            countDownLatch.countDown();
        });
        TimeUnit.SECONDS.sleep(2L);
        this.commitTransaction((timestamp, smallestScn, commitScn, counter) -> {
            TestCase.assertNotSame((Object)mainThread, (Object)Thread.currentThread());
            countDownLatch.countDown();
        });
        TestCase.assertTrue((boolean)countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), "", commitCallback);
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
    }
}

