/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.mysql.ChainedReader;
import io.debezium.connector.mysql.Reader;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ChainedReaderTest {
    private static final List<SourceRecord> RL1 = Collect.arrayListOf((Object)ChainedReaderTest.record(), (Object[])new SourceRecord[0]);
    private static final List<SourceRecord> RL2 = Collect.arrayListOf((Object)ChainedReaderTest.record(), (Object[])new SourceRecord[0]);
    private static final List<SourceRecord> RL3 = Collect.arrayListOf((Object)ChainedReaderTest.record(), (Object[])new SourceRecord[0]);
    private static final List<SourceRecord> RL4 = Collect.arrayListOf((Object)ChainedReaderTest.record(), (Object[])new SourceRecord[0]);
    private static final List<SourceRecord> RL5 = Collect.arrayListOf((Object)ChainedReaderTest.record(), (Object[])new SourceRecord[0]);
    private static final List<List<SourceRecord>> SOURCE_RECORDS = Collect.arrayListOf(RL1, (Object[])new List[]{RL2, RL3, RL4, RL5});
    private ChainedReader reader;

    protected static Supplier<List<SourceRecord>> records() {
        Iterator<List<SourceRecord>> iter = SOURCE_RECORDS.iterator();
        return () -> iter.hasNext() ? (List)iter.next() : null;
    }

    private static SourceRecord record() {
        return new SourceRecord(null, null, null, null, null, null);
    }

    @Before
    public void beforeEach() {
        this.reader = new ChainedReader();
    }

    @Test
    public void shouldNotStartWithoutReaders() throws InterruptedException {
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPED);
        this.reader.start();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPED);
        this.assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws InterruptedException {
        this.reader.add((Reader)new MockReader("r1", ChainedReaderTest.records()));
        this.reader.uponCompletion("Stopped the r1 reader");
        this.reader.start();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.RUNNING);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL4);
        this.reader.stop();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPING);
        Assertions.assertThat((List)this.reader.poll()).isNull();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPED);
        this.assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedException {
        this.reader.add((Reader)new MockReader("r2", ChainedReaderTest.records()));
        this.reader.uponCompletion("Stopped the r2 reader");
        this.reader.start();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.RUNNING);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL5);
        Assertions.assertThat((List)this.reader.poll()).isNull();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPED);
        this.assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartAndStopMultipleReaders() throws InterruptedException {
        this.reader.add((Reader)new MockReader("r3", ChainedReaderTest.records()));
        this.reader.add((Reader)new MockReader("r4", ChainedReaderTest.records()));
        this.reader.uponCompletion("Stopped the r3+r4 reader");
        this.reader.start();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.RUNNING);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL5);
        List records = this.reader.poll();
        Threads.Timer timeout = Threads.timer((Clock)Clock.SYSTEM, (Duration)ConfigurationDefaults.RETURN_CONTROL_INTERVAL);
        while (records == null) {
            if (timeout.expired()) {
                Assert.fail((String)"Subsequent reader has not started");
            }
            Thread.sleep(100L);
            records = this.reader.poll();
        }
        Assertions.assertThat((List)records).isSameAs(RL1);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL2);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL5);
        Assertions.assertThat((List)this.reader.poll()).isNull();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPED);
        this.assertPollReturnsNoMoreRecords();
    }

    @Test
    public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingStopped() throws InterruptedException {
        this.reader.add((Reader)new CompletingMockReader("r5", ChainedReaderTest.records()));
        this.reader.uponCompletion("Stopped the r5 reader");
        this.reader.start();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.RUNNING);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL1);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL2);
        this.reader.stop();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPING);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL3);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL4);
        Assertions.assertThat((List)this.reader.poll()).isSameAs(RL5);
        Assertions.assertThat((List)this.reader.poll()).isNull();
        Assertions.assertThat((Object)this.reader.state()).isEqualTo((Object)Reader.State.STOPPED);
        this.assertPollReturnsNoMoreRecords();
    }

    protected void assertPollReturnsNoMoreRecords() throws InterruptedException {
        for (int i = 0; i != 10; ++i) {
            Assertions.assertThat((List)this.reader.poll()).isNull();
        }
    }

    public static class CompletingMockReader
    extends MockReader {
        public CompletingMockReader(String name, Supplier<List<SourceRecord>> pollResultsSupplier) {
            super(name, pollResultsSupplier);
        }

        @Override
        protected boolean continueReturningRecordsFromPolling() {
            return true;
        }
    }

    public static class MockReader
    implements Reader {
        private final String name;
        private final Supplier<List<SourceRecord>> pollResultsSupplier;
        private final AtomicReference<Runnable> completionHandler = new AtomicReference();
        private final AtomicBoolean running = new AtomicBoolean();
        private final AtomicBoolean completed = new AtomicBoolean();

        public MockReader(String name, Supplier<List<SourceRecord>> pollResultsSupplier) {
            this.name = name;
            this.pollResultsSupplier = pollResultsSupplier;
        }

        public Reader.State state() {
            if (this.running.get()) {
                return Reader.State.RUNNING;
            }
            if (this.completed.get()) {
                return Reader.State.STOPPED;
            }
            return Reader.State.STOPPING;
        }

        public String name() {
            return this.name;
        }

        public List<SourceRecord> poll() throws InterruptedException {
            List<SourceRecord> record = null;
            if (this.continueReturningRecordsFromPolling()) {
                record = this.pollResultsSupplier.get();
            }
            if (record == null) {
                Runnable handler = this.completionHandler.get();
                if (handler != null) {
                    handler.run();
                }
                this.completed.set(true);
                this.running.set(false);
            }
            return record;
        }

        protected boolean continueReturningRecordsFromPolling() {
            return this.running.get();
        }

        public void start() {
            Assertions.assertThat((boolean)this.running.get()).isFalse();
            this.running.set(true);
        }

        public void stop() {
            this.running.set(false);
        }

        public void uponCompletion(Runnable handler) {
            this.completionHandler.set(handler);
        }
    }
}

