/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.db.stream;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.model.StreamEventMetadata;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.stream.ChangeStreamEventConsumer;
import io.debezium.connector.spanner.db.stream.PartitionEventListener;
import io.debezium.connector.spanner.db.stream.SpannerChangeStream;
import io.debezium.connector.spanner.db.stream.SpannerChangeStreamService;
import io.debezium.connector.spanner.db.stream.exception.ChangeStreamException;
import io.debezium.connector.spanner.db.stream.exception.FailureChangeStreamException;
import io.debezium.connector.spanner.db.stream.exception.OutOfRangeChangeStreamException;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.MetricEvent;
import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class SpannerChangeStreamTest {
    SpannerChangeStreamTest() {
    }

    @Test
    void testRun() throws ChangeStreamException, InterruptedException {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        spannerChangeStream.run(() -> false, null, null);
        ((MetricsEventPublisher)Mockito.verify((Object)metricsEventPublisher, (VerificationMode)Mockito.times((int)0))).publishMetricEvent((MetricEvent)ArgumentMatchers.any());
    }

    @Test
    void testOnStreamEvent() throws ChangeStreamException, InterruptedException {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        ChangeStreamEventConsumer changeStreamEventConsumer = (ChangeStreamEventConsumer)Mockito.mock(ChangeStreamEventConsumer.class);
        ChangeStreamEvent changeStreamEvent = (ChangeStreamEvent)Mockito.mock(ChangeStreamEvent.class);
        StreamEventMetadata streamEventMetadata = (StreamEventMetadata)Mockito.mock(StreamEventMetadata.class);
        Mockito.when((Object)changeStreamEvent.getMetadata()).thenReturn((Object)streamEventMetadata);
        Mockito.when((Object)streamEventMetadata.getPartitionToken()).thenReturn((Object)"");
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        spannerChangeStream.run(() -> false, changeStreamEventConsumer, null);
        spannerChangeStream.onStreamEvent(changeStreamEvent);
        ((ChangeStreamEventConsumer)Mockito.verify((Object)changeStreamEventConsumer)).acceptChangeStreamEvent((ChangeStreamEvent)ArgumentMatchers.any());
        ((ChangeStreamEvent)Mockito.verify((Object)changeStreamEvent)).getMetadata();
        ((StreamEventMetadata)Mockito.verify((Object)streamEventMetadata)).getPartitionToken();
    }

    @Test
    void testOnStuckPartition() throws ChangeStreamException, InterruptedException {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        PartitionEventListener partitionEventListener = (PartitionEventListener)Mockito.mock(PartitionEventListener.class);
        Mockito.when((Object)partitionEventListener.onStuckPartition(ArgumentMatchers.anyString())).thenReturn((Object)true);
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        spannerChangeStream.run(() -> false, null, partitionEventListener);
        spannerChangeStream.onStuckPartition("");
        ((PartitionEventListener)Mockito.verify((Object)partitionEventListener)).onStuckPartition((String)ArgumentMatchers.any());
    }

    @Test
    void testOnError() {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        FailureChangeStreamException exception = (FailureChangeStreamException)Mockito.mock(FailureChangeStreamException.class);
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        Assertions.assertTrue((boolean)spannerChangeStream.onError(null, null));
        Assertions.assertTrue((boolean)spannerChangeStream.onError((ChangeStreamException)exception));
    }

    @Test
    void testStop() {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        spannerChangeStream.stop();
        spannerChangeStream.stop("test");
        ((MetricsEventPublisher)Mockito.verify((Object)metricsEventPublisher)).publishMetricEvent((MetricEvent)ArgumentMatchers.any());
    }

    @Test
    void testIsCanceled() {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        SpannerException exception = (SpannerException)Mockito.mock(SpannerException.class);
        Mockito.when((Object)exception.getErrorCode()).thenReturn((Object)ErrorCode.CANCELLED);
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        Assertions.assertTrue((boolean)spannerChangeStream.isCanceled((Exception)exception));
        Assertions.assertFalse((boolean)spannerChangeStream.isCanceled(null));
    }

    @Test
    void testGetStreamException() {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        SpannerException exception = (SpannerException)Mockito.mock(SpannerException.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.toString()).thenReturn((Object)"");
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        Mockito.when((Object)exception.getErrorCode()).thenReturn((Object)ErrorCode.OUT_OF_RANGE);
        Assertions.assertTrue((boolean)(spannerChangeStream.getStreamException(partition, (Exception)exception) instanceof OutOfRangeChangeStreamException));
        Mockito.when((Object)exception.getErrorCode()).thenReturn((Object)ErrorCode.INVALID_ARGUMENT);
        Assertions.assertTrue((boolean)(spannerChangeStream.getStreamException(null, (Exception)exception) instanceof ChangeStreamException));
        Assertions.assertTrue((boolean)(spannerChangeStream.getStreamException(null, (Exception)new NullPointerException()) instanceof ChangeStreamException));
    }

    @Test
    void testSubmitPartition() {
        SpannerChangeStreamService streamService = (SpannerChangeStreamService)Mockito.mock(SpannerChangeStreamService.class);
        MetricsEventPublisher metricsEventPublisher = (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class);
        HashSet parentTokens = new HashSet();
        Timestamp startTimestamp = Timestamp.ofTimeMicroseconds((long)1L);
        Partition partition = new Partition("partitionToken", parentTokens, startTimestamp, Timestamp.ofTimeMicroseconds((long)1L));
        SpannerChangeStream spannerChangeStream = new SpannerChangeStream(streamService, metricsEventPublisher, Duration.ofSeconds(60L), 3);
        new Thread(() -> {
            try {
                spannerChangeStream.run(() -> true, changeStreamEvent -> {}, new PartitionEventListener(){

                    public void onRun(Partition partition) {
                    }

                    public void onFinish(Partition partition) {
                    }

                    public void onException(Partition partition, Exception ex) {
                    }

                    public boolean onStuckPartition(String token) {
                        return false;
                    }
                });
            }
            catch (ChangeStreamException e) {
                throw new RuntimeException(e);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> spannerChangeStream.submitPartition(partition));
        ((MetricsEventPublisher)Mockito.verify((Object)metricsEventPublisher, (VerificationMode)Mockito.times((int)2))).publishMetricEvent((MetricEvent)ArgumentMatchers.any());
    }
}

