package io.vertx.cassandra;

import io.vertx.core.Future;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/cassandra/StreamingTest.class */
public class StreamingTest extends CassandraServiceBase {
    @Test
    public void testReadStream(TestContext testContext) {
        CassandraClient createNonShared = CassandraClient.createNonShared(this.vertx, new CassandraClientOptions().setPort(9142));
        Async async = testContext.async();
        Future future = Future.future();
        createNonShared.connect(future);
        future.compose(r5 -> {
            Future future2 = Future.future();
            createNonShared.queryStream("select random_string from random_strings.random_string_by_first_letter where first_letter = 'A'", future2);
            return future2;
        }).compose(cassandraRowStream -> {
            ArrayList arrayList = new ArrayList();
            AtomicInteger atomicInteger = new AtomicInteger();
            long j = 500;
            long nanoTime = System.nanoTime();
            CassandraRowStream endHandler = cassandraRowStream.endHandler(r15 -> {
                testContext.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 3 * j);
                createNonShared.disconnect(asyncResult -> {
                    async.countDown();
                });
            });
            testContext.getClass();
            endHandler.exceptionHandler(testContext::fail).handler(row -> {
                arrayList.add(row);
                int andIncrement = atomicInteger.getAndIncrement();
                if (andIncrement == 3 || andIncrement == 16 || andIncrement == 38) {
                    cassandraRowStream.pause();
                    int size = arrayList.size();
                    this.vertx.setTimer(j, l -> {
                        testContext.assertTrue(size == arrayList.size());
                        cassandraRowStream.resume();
                    });
                }
            });
            return Future.succeededFuture();
        }).setHandler(asyncResult -> {
            if (asyncResult.failed()) {
                testContext.fail(asyncResult.cause());
            }
        });
    }

    @Test
    public void emptyStream(TestContext testContext) {
        CassandraClient createNonShared = CassandraClient.createNonShared(this.vertx, new CassandraClientOptions().setPort(9142));
        Async async = testContext.async();
        Future future = Future.future();
        createNonShared.connect(future);
        future.compose(r5 -> {
            Future future2 = Future.future();
            createNonShared.queryStream("select random_string from random_strings.random_string_by_first_letter where first_letter = 'I WANT EMPTY RESULT'", future2);
            return future2;
        }).compose(cassandraRowStream -> {
            CassandraRowStream endHandler = cassandraRowStream.endHandler(r52 -> {
                createNonShared.disconnect(asyncResult -> {
                    async.countDown();
                });
            });
            testContext.getClass();
            endHandler.exceptionHandler(testContext::fail).handler(row -> {
                testContext.fail();
            });
            return Future.succeededFuture();
        }).setHandler(asyncResult -> {
            if (asyncResult.failed()) {
                testContext.fail(asyncResult.cause());
            }
        });
    }
}
