/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql.legacy;

import io.debezium.connector.mysql.legacy.BinlogReader;
import io.debezium.connector.mysql.legacy.ParallelSnapshotReader;
import io.debezium.connector.mysql.legacy.Reader;
import io.debezium.connector.mysql.legacy.SnapshotReader;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class ParallelSnapshotReaderTest {
    @Test
    public void startStartsBothReaders() {
        BinlogReader mockOldBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        SnapshotReader mockNewSnapshotReader = (SnapshotReader)Mockito.mock(SnapshotReader.class);
        BinlogReader mockNewBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
        parallelSnapshotReader.start();
        Assert.assertSame((Object)parallelSnapshotReader.state(), (Object)Reader.State.RUNNING);
        ((BinlogReader)Mockito.verify((Object)mockOldBinlogReader)).start();
        ((SnapshotReader)Mockito.verify((Object)mockNewSnapshotReader)).start();
    }

    @Test
    public void pollCombinesBothReadersPolls() throws InterruptedException {
        BinlogReader mockOldBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        SnapshotReader mockNewSnapshotReader = (SnapshotReader)Mockito.mock(SnapshotReader.class);
        BinlogReader mockNewBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
        SourceRecord oldBinlogSourceRecord = (SourceRecord)Mockito.mock(SourceRecord.class);
        ArrayList<SourceRecord> oldBinlogRecords = new ArrayList<SourceRecord>();
        oldBinlogRecords.add(oldBinlogSourceRecord);
        SourceRecord newSnapshotSourceRecord = (SourceRecord)Mockito.mock(SourceRecord.class);
        ArrayList<SourceRecord> newSnapshotRecords = new ArrayList<SourceRecord>();
        newSnapshotRecords.add(newSnapshotSourceRecord);
        Mockito.when((Object)mockOldBinlogReader.isRunning()).thenReturn((Object)true);
        Mockito.when((Object)mockOldBinlogReader.poll()).thenReturn(oldBinlogRecords);
        Mockito.when((Object)mockNewSnapshotReader.poll()).thenReturn(newSnapshotRecords);
        parallelSnapshotReader.start();
        List parallelRecords = parallelSnapshotReader.poll();
        Assert.assertEquals((long)2L, (long)parallelRecords.size());
        Assert.assertTrue((boolean)parallelRecords.contains(oldBinlogSourceRecord));
        Assert.assertTrue((boolean)parallelRecords.contains(newSnapshotSourceRecord));
    }

    @Test
    public void pollReturnsNewIfOldReaderIsStopped() throws InterruptedException {
        BinlogReader mockOldBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        SnapshotReader mockNewSnapshotReader = (SnapshotReader)Mockito.mock(SnapshotReader.class);
        BinlogReader mockNewBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
        SourceRecord newSnapshotSourceRecord = (SourceRecord)Mockito.mock(SourceRecord.class);
        ArrayList<SourceRecord> newSnapshotRecords = new ArrayList<SourceRecord>();
        newSnapshotRecords.add(newSnapshotSourceRecord);
        Mockito.when((Object)mockOldBinlogReader.isRunning()).thenReturn((Object)false);
        Mockito.when((Object)mockOldBinlogReader.poll()).thenThrow(new Throwable[]{new InterruptedException()});
        Mockito.when((Object)mockNewSnapshotReader.poll()).thenReturn(newSnapshotRecords);
        parallelSnapshotReader.start();
        List parallelRecords = parallelSnapshotReader.poll();
        Assert.assertEquals((long)1L, (long)parallelRecords.size());
        Assert.assertTrue((boolean)parallelRecords.contains(newSnapshotSourceRecord));
    }

    @Test
    public void pollReturnsOldIfNewReaderIsStopped() throws InterruptedException {
        BinlogReader mockOldBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        SnapshotReader mockNewSnapshotReader = (SnapshotReader)Mockito.mock(SnapshotReader.class);
        BinlogReader mockNewBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
        SourceRecord oldBinlogSourceRecord = (SourceRecord)Mockito.mock(SourceRecord.class);
        ArrayList<SourceRecord> oldBinlogRecords = new ArrayList<SourceRecord>();
        oldBinlogRecords.add(oldBinlogSourceRecord);
        Mockito.when((Object)mockOldBinlogReader.isRunning()).thenReturn((Object)true);
        Mockito.when((Object)mockOldBinlogReader.poll()).thenReturn(oldBinlogRecords);
        List parallelRecords = parallelSnapshotReader.poll();
        Assert.assertEquals((long)1L, (long)parallelRecords.size());
        Assert.assertTrue((boolean)parallelRecords.contains(oldBinlogSourceRecord));
    }

    @Test
    public void pollReturnsNullIfBothReadersAreStopped() throws InterruptedException {
        BinlogReader mockOldBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        SnapshotReader mockNewSnapshotReader = (SnapshotReader)Mockito.mock(SnapshotReader.class);
        BinlogReader mockNewBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
        Mockito.when((Object)mockOldBinlogReader.isRunning()).thenReturn((Object)false);
        Mockito.when((Object)mockOldBinlogReader.poll()).thenThrow(new Throwable[]{new InterruptedException()});
        Mockito.when((Object)mockNewBinlogReader.poll()).thenReturn(null);
        List parallelRecords = parallelSnapshotReader.poll();
        Assert.assertEquals(null, (Object)parallelRecords);
    }

    @Test
    public void testStopStopsBothReaders() {
        BinlogReader mockOldBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        SnapshotReader mockNewSnapshotReader = (SnapshotReader)Mockito.mock(SnapshotReader.class);
        BinlogReader mockNewBinlogReader = (BinlogReader)Mockito.mock(BinlogReader.class);
        ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
        parallelSnapshotReader.start();
        parallelSnapshotReader.stop();
        Assert.assertTrue((parallelSnapshotReader.state() == Reader.State.STOPPED ? 1 : 0) != 0);
        ((BinlogReader)Mockito.verify((Object)mockOldBinlogReader)).stop();
        ((SnapshotReader)Mockito.verify((Object)mockNewSnapshotReader)).stop();
    }

    @Test
    public void testHaltingPredicateHonorsTimeRange() {
        AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
        AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false);
        Duration duration = Duration.ofMinutes(5L);
        ParallelSnapshotReader.ParallelHaltingPredicate parallelHaltingPredicate = new ParallelSnapshotReader.ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
        boolean testResult = parallelHaltingPredicate.accepts(this.createSourceRecordWithTimestamp(Instant.now().minus(duration.multipliedBy(2L))));
        Assert.assertTrue((boolean)testResult);
        Assert.assertFalse((boolean)thisReaderNearEnd.get());
        Assert.assertFalse((boolean)otherReaderNearEnd.get());
    }

    @Test
    public void testHaltingPredicateFlipsthisReaderNearEnd() {
        AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
        AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false);
        Duration duration = Duration.ofMinutes(5L);
        ParallelSnapshotReader.ParallelHaltingPredicate parallelHaltingPredicate = new ParallelSnapshotReader.ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
        boolean testResult = parallelHaltingPredicate.accepts(this.createSourceRecordWithTimestamp(Instant.now()));
        Assert.assertTrue((boolean)testResult);
        Assert.assertTrue((boolean)thisReaderNearEnd.get());
        Assert.assertFalse((boolean)otherReaderNearEnd.get());
    }

    @Test
    public void testHaltingPredicateHalts() {
        AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
        AtomicBoolean otherReaderNearEnd = new AtomicBoolean(true);
        Duration duration = Duration.ofMinutes(5L);
        ParallelSnapshotReader.ParallelHaltingPredicate parallelHaltingPredicate = new ParallelSnapshotReader.ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
        boolean testResult = parallelHaltingPredicate.accepts(this.createSourceRecordWithTimestamp(Instant.now()));
        Assert.assertFalse((boolean)testResult);
        Assert.assertTrue((boolean)thisReaderNearEnd.get());
        Assert.assertTrue((boolean)otherReaderNearEnd.get());
    }

    private SourceRecord createSourceRecordWithTimestamp(Instant ts) {
        Map<String, Long> offset = Collections.singletonMap("ts_sec", ts.getEpochSecond());
        return new SourceRecord(null, offset, null, null, null);
    }
}

