package io.vertx.rxjava3.test;

import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.exceptions.ProtocolViolationException;
import io.reactivex.rxjava3.exceptions.UndeliverableException;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.lang.rx.test.FakeWriteStream;
import io.vertx.rxjava3.RxHelper;
import io.vertx.rxjava3.WriteStreamSubscriber;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.Executors;
import org.hamcrest.CoreMatchers;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:io/vertx/rxjava3/test/WriteStreamSubscriberTest.class */
public class WriteStreamSubscriberTest extends VertxTestBase {
    protected void tearDown() throws Exception {
        RxJavaPlugins.reset();
        super.tearDown();
    }

    @Test
    public void testFlowableErrorReported() throws Exception {
        Exception exc = new Exception();
        FakeWriteStream fakeWriteStream = new FakeWriteStream(this.vertx);
        Flowable.error(exc).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(fakeWriteStream).onError(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.sameInstance(exc)));
            complete();
        }));
        await();
        assertFalse("Did not expect writeStream end method to be invoked", fakeWriteStream.endInvoked());
    }

    @Test
    @Repeat(times = 10)
    public void testFlowableToWriteStreamVertxThread() throws Exception {
        testFlowableToWriteStream(RxHelper.scheduler(this.vertx.getOrCreateContext()));
    }

    @Test
    @Repeat(times = 10)
    public void testFlowableToWriteStreamNonVertxThread() throws Exception {
        testFlowableToWriteStream(Schedulers.from(Executors.newFixedThreadPool(5)));
    }

    private void testFlowableToWriteStream(Scheduler scheduler) throws Exception {
        FakeWriteStream fakeWriteStream = new FakeWriteStream(this.vertx);
        Flowable.range(0, 10000).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(fakeWriteStream).onWriteStreamEnd(this::complete));
        await();
        assertTrue("Expected drainHandler to be invoked", fakeWriteStream.drainHandlerInvoked());
        assertEquals(10000, fakeWriteStream.getCount());
        assertTrue("Expected writeStream end method to be invoked", fakeWriteStream.endInvoked());
    }

    @Test
    public void testCannotSubscribeTwice() throws Exception {
        waitFor(2);
        RxJavaPlugins.setErrorHandler(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.instanceOf(ProtocolViolationException.class)));
            complete();
        });
        WriteStreamSubscriber onWriteStreamEnd = RxHelper.toSubscriber(new FakeWriteStream(this.vertx)).onWriteStreamEnd(this::complete);
        Flowable.range(0, 100).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(onWriteStreamEnd);
        Flowable.range(0, 100).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(onWriteStreamEnd);
        await();
    }

    @Test
    public void testOnNextThrowsFatal() throws Exception {
        UnknownError unknownError = new UnknownError();
        this.vertx.exceptionHandler(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.sameInstance(unknownError)));
            complete();
        });
        FakeWriteStream onWrite = new FakeWriteStream(this.vertx).setOnWrite(() -> {
            throw unknownError;
        });
        Flowable.just(0).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(onWrite));
        await();
        assertFalse("Did not expect writeStream end method to be invoked", onWrite.endInvoked());
    }

    @Test
    public void testWriteThrowsException() throws Exception {
        waitFor(2);
        RuntimeException runtimeException = new RuntimeException();
        FakeWriteStream onWrite = new FakeWriteStream(this.vertx).setOnWrite(() -> {
            throw runtimeException;
        });
        Flowable.create(flowableEmitter -> {
            flowableEmitter.setCancellable(this::complete);
            flowableEmitter.onNext(0);
        }, BackpressureStrategy.MISSING).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(onWrite).onError(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
            complete();
        }));
        await();
        assertFalse("Did not expect writeStream end method to be invoked", onWrite.endInvoked());
    }

    @Test
    public void testOnErrorThrowsException() throws Exception {
        RuntimeException runtimeException = new RuntimeException();
        RxJavaPlugins.setErrorHandler(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.instanceOf(UndeliverableException.class)));
            assertThat(th.getCause(), CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
            complete();
        });
        FakeWriteStream fakeWriteStream = new FakeWriteStream(this.vertx);
        Flowable.error(new RuntimeException()).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(fakeWriteStream).onError(th2 -> {
            throw runtimeException;
        }));
        await();
        assertFalse("Did not expect writeStream end method to be invoked", fakeWriteStream.endInvoked());
    }

    @Test
    public void testOnWriteStreamEndThrowsException() throws Exception {
        RuntimeException runtimeException = new RuntimeException();
        RxJavaPlugins.setErrorHandler(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.instanceOf(UndeliverableException.class)));
            assertThat(th.getCause(), CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
            complete();
        });
        FakeWriteStream fakeWriteStream = new FakeWriteStream(this.vertx);
        Flowable.just(0).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toSubscriber(fakeWriteStream).onWriteStreamEnd(() -> {
            throw runtimeException;
        }));
        await();
        assertTrue("Expected writeStream end method to be invoked", fakeWriteStream.endInvoked());
    }

    @Test
    @Ignore
    public void testWriteStreamError() throws Exception {
        testWriteStreamError(false);
    }

    @Test
    @Ignore
    public void testWriteStreamErrorAfterComplete() throws Exception {
        testWriteStreamError(true);
    }

    private void testWriteStreamError(boolean z) {
        waitFor(2);
        RuntimeException runtimeException = new RuntimeException();
        FakeWriteStream failAfterWrite = new FakeWriteStream(this.vertx).failAfterWrite(runtimeException);
        Observable.create(observableEmitter -> {
            observableEmitter.setCancellable(this::complete);
            observableEmitter.onNext(0);
            if (z) {
                observableEmitter.onComplete();
            }
        }).observeOn(RxHelper.scheduler(this.vertx)).subscribeOn(RxHelper.scheduler(this.vertx)).subscribe(RxHelper.toObserver(failAfterWrite).onWriteStreamError(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
            complete();
        }));
        await();
        assertEquals(Boolean.valueOf(z), Boolean.valueOf(failAfterWrite.endInvoked()));
    }
}
