package io.vertx.core.eventbus;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.zookeeper.MockZKCluster;
import io.vertx.test.core.TestUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.junit.Test;

/* loaded from: input_file:io/vertx/core/eventbus/ZKClusteredEventbusTest.class */
public class ZKClusteredEventbusTest extends ClusteredEventBusTest {
    private final MockZKCluster zkClustered = new MockZKCluster();

    public void tearDown() throws Exception {
        super.tearDown();
        this.zkClustered.stop();
    }

    public void await(long j, TimeUnit timeUnit) {
        super.await(10L, TimeUnit.SECONDS);
    }

    protected <T, R> void testSend(T t, R r, Consumer<T> consumer, DeliveryOptions deliveryOptions) {
        if (this.vertices == null) {
            startNodes(2);
        }
        this.vertices[1].eventBus().consumer("some-address1").handler(message -> {
            if (consumer == null) {
                assertTrue(message.isSend());
                assertEquals(r, message.body());
                if (deliveryOptions != null) {
                    assertNotNull(message.headers());
                    int size = deliveryOptions.getHeaders() != null ? deliveryOptions.getHeaders().size() : 0;
                    assertEquals(size, message.headers().size());
                    if (size != 0) {
                        for (Map.Entry entry : deliveryOptions.getHeaders().entries()) {
                            assertEquals(message.headers().get((String) entry.getKey()), entry.getValue());
                        }
                    }
                }
            } else {
                consumer.accept(message.body());
            }
            testComplete();
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].setTimer(200L, l -> {
                if (deliveryOptions == null) {
                    this.vertices[0].eventBus().send("some-address1", t);
                } else {
                    this.vertices[0].eventBus().send("some-address1", t, deliveryOptions);
                }
            });
        });
        await();
    }

    protected <T, R> void testReply(T t, R r, Consumer<R> consumer, DeliveryOptions deliveryOptions) {
        if (this.vertices == null) {
            startNodes(2);
        }
        String randomUnicodeString = TestUtils.randomUnicodeString(1000);
        this.vertices[1].eventBus().consumer("some-address1").handler(message -> {
            assertEquals(randomUnicodeString, message.body());
            if (deliveryOptions == null) {
                message.reply(t);
            } else {
                message.reply(t, deliveryOptions);
            }
        }).completionHandler(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].setTimer(200L, l -> {
                this.vertices[0].eventBus().request("some-address1", randomUnicodeString, onSuccess(message2 -> {
                    if (consumer == null) {
                        assertTrue(message2.isSend());
                        assertEquals(r, message2.body());
                        if (deliveryOptions != null && deliveryOptions.getHeaders() != null) {
                            assertNotNull(message2.headers());
                            assertEquals(deliveryOptions.getHeaders().size(), message2.headers().size());
                            for (Map.Entry entry : deliveryOptions.getHeaders().entries()) {
                                assertEquals(message2.headers().get((String) entry.getKey()), entry.getValue());
                            }
                        }
                    } else {
                        consumer.accept(message2.body());
                    }
                    testComplete();
                }));
            });
        });
        await();
    }

    @Test
    public void testLocalHandlerClusteredPublish() throws Exception {
        startNodes(2);
        waitFor(2);
        this.vertices[1].eventBus().consumer("some-address1", message -> {
            complete();
        }).completionHandler(asyncResult -> {
            this.vertices[0].eventBus().localConsumer("some-address1", message2 -> {
                complete();
            }).completionHandler(asyncResult -> {
                this.vertices[1].setTimer(200L, l -> {
                    this.vertices[0].eventBus().publish("some-address1", "foo");
                });
            });
        });
        await();
    }

    protected <T> void testPublish(final T t, final Consumer<T> consumer) {
        final int i = 3;
        startNodes(3);
        final AtomicInteger atomicInteger = new AtomicInteger();
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.vertices[2].eventBus().consumer("some-address1").handler(new Handler<Message<T>>() { // from class: io.vertx.core.eventbus.ZKClusteredEventbusTest.1MyHandler
            /* JADX WARN: Multi-variable type inference failed */
            public void handle(Message<T> message) {
                if (consumer == null) {
                    ZKClusteredEventbusTest.this.assertFalse(message.isSend());
                    ZKClusteredEventbusTest.this.assertEquals(t, message.body());
                } else {
                    consumer.accept(message.body());
                }
                if (atomicInteger.incrementAndGet() == i - 1) {
                    ZKClusteredEventbusTest.this.testComplete();
                }
            }
        }).completionHandler(new Handler<AsyncResult<Void>>() { // from class: io.vertx.core.eventbus.ZKClusteredEventbusTest.1MyRegisterHandler
            public void handle(AsyncResult<Void> asyncResult) {
                ZKClusteredEventbusTest.this.assertTrue(asyncResult.succeeded());
                if (atomicInteger2.incrementAndGet() == 2) {
                    Vertx vertx = ZKClusteredEventbusTest.this.vertices[0];
                    Object obj = t;
                    vertx.setTimer(300L, l -> {
                        ZKClusteredEventbusTest.this.vertices[0].eventBus().publish("some-address1", obj);
                    });
                }
            }
        });
        this.vertices[1].eventBus().consumer("some-address1").handler(new Handler<Message<T>>() { // from class: io.vertx.core.eventbus.ZKClusteredEventbusTest.1MyHandler
            /* JADX WARN: Multi-variable type inference failed */
            public void handle(Message<T> message) {
                if (consumer == null) {
                    ZKClusteredEventbusTest.this.assertFalse(message.isSend());
                    ZKClusteredEventbusTest.this.assertEquals(t, message.body());
                } else {
                    consumer.accept(message.body());
                }
                if (atomicInteger.incrementAndGet() == i - 1) {
                    ZKClusteredEventbusTest.this.testComplete();
                }
            }
        }).completionHandler(new Handler<AsyncResult<Void>>() { // from class: io.vertx.core.eventbus.ZKClusteredEventbusTest.1MyRegisterHandler
            public void handle(AsyncResult<Void> asyncResult) {
                ZKClusteredEventbusTest.this.assertTrue(asyncResult.succeeded());
                if (atomicInteger2.incrementAndGet() == 2) {
                    Vertx vertx = ZKClusteredEventbusTest.this.vertices[0];
                    Object obj = t;
                    vertx.setTimer(300L, l -> {
                        ZKClusteredEventbusTest.this.vertices[0].eventBus().publish("some-address1", obj);
                    });
                }
            }
        });
        await();
    }

    protected ClusterManager getClusterManager() {
        return this.zkClustered.getClusterManager();
    }
}
