/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.Scn;
import io.debezium.connector.oracle.logminer.TransactionalBuffer;
import io.debezium.connector.oracle.logminer.TransactionalBufferMetrics;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.txmetadata.TransactionContext;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;

@SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
public class TransactionalBufferTest {
    private static final String SERVER_NAME = "serverX";
    private static final String TRANSACTION_ID = "transaction";
    private static final String OTHER_TRANSACTION_ID = "other_transaction";
    private static final String SQL_ONE = "update table";
    private static final String SQL_TWO = "insert into table";
    private static final String MESSAGE = "OK";
    private static final Scn SCN_ONE;
    private static final Scn SCN;
    private static final Scn OTHER_SCN;
    private static final Scn LARGEST_SCN;
    private static final Timestamp TIMESTAMP;
    private static final Configuration config;
    private static final OracleConnectorConfig connectorConfig;
    private static OracleOffsetContext offsetContext;
    private OracleTaskContext taskContext;
    private ErrorHandler errorHandler;
    private TransactionalBuffer transactionalBuffer;
    private TransactionalBufferMetrics metrics;
    private EventDispatcher<?> dispatcher;
    @Rule
    public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();

    @Before
    public void before() {
        ChangeEventQueue queue = new ChangeEventQueue.Builder().pollInterval(Duration.of(8192L, ChronoUnit.MILLIS)).maxBatchSize(2048).maxQueueSize(8192).build();
        this.errorHandler = new ErrorHandler(OracleConnector.class, SERVER_NAME, queue);
        this.taskContext = (OracleTaskContext)Mockito.mock(OracleTaskContext.class);
        Mockito.when((Object)this.taskContext.getConnectorName()).thenReturn((Object)"connector name");
        Mockito.when((Object)this.taskContext.getConnectorType()).thenReturn((Object)"connector type");
        this.dispatcher = (EventDispatcher)Mockito.mock(EventDispatcher.class);
        this.transactionalBuffer = new TransactionalBuffer(this.taskContext, this.errorHandler);
        this.metrics = this.transactionalBuffer.getMetrics();
    }

    @After
    public void after() throws InterruptedException {
        this.transactionalBuffer.close();
    }

    @Test
    public void testIsEmpty() {
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
    }

    @Test
    public void testIsNotEmptyWhenTransactionIsRegistered() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
    }

    @Test
    public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException {
        CountDownLatch commitLatch = new CountDownLatch(1);
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown());
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), (LcrPosition)null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, this.dispatcher);
        commitLatch.await();
        Thread.sleep(1000L);
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
    }

    @Test
    public void testIsEmptyWhenTransactionIsRolledBack() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
    }

    @Test
    public void testNonEmptyFirstTransactionIsRolledBack() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.rollback(TRANSACTION_ID, "");
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isTrue();
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isFalse();
    }

    @Test
    public void testNonEmptySecondTransactionIsRolledBack() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isFalse();
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isTrue();
    }

    @Test
    public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException {
        CountDownLatch commitLatch = new CountDownLatch(1);
        AtomicReference smallestScnContainer = new AtomicReference();
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
            smallestScnContainer.set(smallestScn);
            commitLatch.countDown();
        });
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, this.dispatcher);
        commitLatch.await();
        Assertions.assertThat(smallestScnContainer.get()).isNull();
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
    }

    @Test
    public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException {
        CountDownLatch commitLatch = new CountDownLatch(1);
        AtomicReference smallestScnContainer = new AtomicReference();
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
            smallestScnContainer.set(smallestScn);
            commitLatch.countDown();
        });
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, this.dispatcher);
        commitLatch.await();
        Assertions.assertThat(smallestScnContainer.get()).isEqualTo((Object)OTHER_SCN);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
    }

    @Test
    public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        CountDownLatch commitLatch = new CountDownLatch(1);
        AtomicReference smallestScnContainer = new AtomicReference();
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
            smallestScnContainer.set(smallestScn);
            commitLatch.countDown();
        });
        offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), Long.valueOf(OTHER_SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, this.dispatcher);
        commitLatch.await();
        Assertions.assertThat(smallestScnContainer.get()).isEqualTo((Object)SCN);
        Assertions.assertThat((boolean)this.transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
    }

    @Test
    public void testAbandoningOneTransaction() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), (LcrPosition)null, false, true, new TransactionContext());
        this.transactionalBuffer.abandonLongTransactions(Long.valueOf(SCN.longValue()), offsetContext);
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(true);
    }

    @Test
    public void testAbandoningTransactionHavingAnotherOne() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.abandonLongTransactions(Long.valueOf(SCN.longValue()), offsetContext);
        Assertions.assertThat((boolean)this.transactionalBuffer.isEmpty()).isEqualTo(false);
    }

    @Test
    public void testTransactionDump() {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        this.transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {});
        Assertions.assertThat((String)this.transactionalBuffer.toString()).contains(String.valueOf(SCN));
        Assertions.assertThat((String)this.transactionalBuffer.toString()).contains(String.valueOf(OTHER_SCN));
    }

    private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) {
        this.transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), commitCallback);
        offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), Long.valueOf(SCN.longValue()), null, false, true, new TransactionContext());
        this.transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, this.dispatcher);
    }

    static {
        SCN = SCN_ONE = new Scn(BigDecimal.ONE);
        OTHER_SCN = Scn.valueOf((long)10L);
        LARGEST_SCN = Scn.valueOf((long)100L);
        TIMESTAMP = new Timestamp(System.currentTimeMillis());
        config = new Configuration(){

            public Set<String> keys() {
                return Collections.emptySet();
            }

            public String getString(String key) {
                return null;
            }
        };
        connectorConfig = new OracleConnectorConfig(config);
    }
}

