package io.vertx.amqp;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/vertx/amqp/AmqpUsage.class */
public class AmqpUsage {
    private static Logger LOGGER = LoggerFactory.getLogger(AmqpUsage.class);
    private final Context context;
    private ProtonClient client;
    private ProtonConnection connection;
    private List<ProtonSender> senders;
    private List<ProtonReceiver> receivers;

    public AmqpUsage(Vertx vertx, String str, int i) {
        this(vertx, str, i, "artemis", "simetraehcapa");
    }

    public AmqpUsage(Vertx vertx, String str, int i, String str2, String str3) {
        this.senders = new CopyOnWriteArrayList();
        this.receivers = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.context = vertx.getOrCreateContext();
        this.context.runOnContext(r15 -> {
            this.client = ProtonClient.create(vertx);
            this.client.connect(str, i, str2, str3, asyncResult -> {
                if (asyncResult.succeeded()) {
                    LOGGER.info("Connection to the AMQP host succeeded");
                    this.connection = (ProtonConnection) asyncResult.result();
                    this.connection.openHandler(asyncResult -> {
                        countDownLatch.countDown();
                    }).open();
                }
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    public void produce(String str, int i, Runnable runnable, Supplier<Object> supplier) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.context.runOnContext(r7 -> {
            ProtonSender createSender = this.connection.createSender(str);
            atomicReference.set(createSender);
            this.senders.add(createSender);
            createSender.openHandler(asyncResult -> {
                countDownLatch.countDown();
            }).open();
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Thread thread = new Thread(() -> {
            LOGGER.info("Starting AMQP sender to write {} messages", Integer.valueOf(i));
            try {
                for (int i2 = 0; i2 != i; i2++) {
                    try {
                        Object obj = supplier.get();
                        Message message = ProtonHelper.message();
                        if (obj instanceof Section) {
                            message.setBody((Section) obj);
                        } else if (obj != null) {
                            message.setBody(new AmqpValue(obj));
                        }
                        message.setDurable(true);
                        message.setTtl(10000L);
                        CountDownLatch countDownLatch2 = new CountDownLatch(1);
                        this.context.runOnContext(r72 -> {
                            ((ProtonSender) atomicReference.get()).send(message, protonDelivery -> {
                                countDownLatch2.countDown();
                            });
                        });
                        countDownLatch2.await();
                        LOGGER.info("Producer sent message {}", obj);
                    } catch (Exception e2) {
                        LOGGER.error("Unable to send message", e2);
                        if (runnable != null) {
                            runnable.run();
                        }
                        this.context.runOnContext(r3 -> {
                            ((ProtonSender) atomicReference.get()).close();
                        });
                        return;
                    }
                }
                if (runnable != null) {
                    runnable.run();
                }
                this.context.runOnContext(r32 -> {
                    ((ProtonSender) atomicReference.get()).close();
                });
            } catch (Throwable th) {
                if (runnable != null) {
                    runnable.run();
                }
                this.context.runOnContext(r322 -> {
                    ((ProtonSender) atomicReference.get()).close();
                });
                throw th;
            }
        });
        thread.setName(str + "-thread");
        thread.start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e2) {
            LOGGER.error("Interrupted while waiting for the ProtonSender to be opened", e2);
        }
    }

    public void produceStrings(String str, int i, Runnable runnable, Supplier<String> supplier) {
        supplier.getClass();
        produce(str, i, runnable, supplier::get);
    }

    public void consume(String str, BooleanSupplier booleanSupplier, Runnable runnable, Consumer<AmqpMessage> consumer) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            try {
                try {
                    this.context.runOnContext(r11 -> {
                        ProtonReceiver createReceiver = this.connection.createReceiver(str);
                        this.receivers.add(createReceiver);
                        createReceiver.handler((protonDelivery, message) -> {
                            LOGGER.info("Consumer {}: consuming message {}", str, message.getBody());
                            consumer.accept(AmqpMessage.create(message).build());
                            if (booleanSupplier.getAsBoolean()) {
                                return;
                            }
                            createReceiver.close();
                        }).openHandler(asyncResult -> {
                            LOGGER.info("Starting consumer to read messages on {}", str);
                            countDownLatch.countDown();
                        }).open();
                    });
                    if (runnable != null) {
                        runnable.run();
                    }
                } catch (Exception e) {
                    LOGGER.error("Unable to receive messages from {}", str, e);
                    if (runnable != null) {
                        runnable.run();
                    }
                }
            } catch (Throwable th) {
                if (runnable != null) {
                    runnable.run();
                }
                throw th;
            }
        });
        thread.setName(str + "-thread");
        thread.start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            LOGGER.error("Interrupted while waiting for the ProtonReceiver to be opened", e);
        }
    }

    public void consumeStrings(String str, BooleanSupplier booleanSupplier, Runnable runnable, Consumer<String> consumer) {
        consume(str, booleanSupplier, runnable, amqpMessage -> {
            consumer.accept(amqpMessage.bodyAsString());
        });
    }

    public void consumeStrings(String str, int i, long j, TimeUnit timeUnit, Runnable runnable, Consumer<String> consumer) {
        AtomicLong atomicLong = new AtomicLong();
        consumeStrings(str, continueIfNotExpired(() -> {
            return atomicLong.get() < ((long) i);
        }, j, timeUnit), runnable, str2 -> {
            consumer.accept(str2);
            atomicLong.incrementAndGet();
        });
    }

    public void consumeMessages(String str, int i, long j, TimeUnit timeUnit, Runnable runnable, Consumer<AmqpMessage> consumer) {
        AtomicLong atomicLong = new AtomicLong();
        consume(str, continueIfNotExpired(() -> {
            return atomicLong.get() < ((long) i);
        }, j, timeUnit), runnable, amqpMessage -> {
            consumer.accept(amqpMessage);
            atomicLong.incrementAndGet();
        });
    }

    private BooleanSupplier continueIfNotExpired(final BooleanSupplier booleanSupplier, final long j, final TimeUnit timeUnit) {
        return new BooleanSupplier() { // from class: io.vertx.amqp.AmqpUsage.1
            long stopTime = 0;

            @Override // java.util.function.BooleanSupplier
            public boolean getAsBoolean() {
                if (this.stopTime == 0) {
                    this.stopTime = System.currentTimeMillis() + timeUnit.toMillis(j);
                }
                return booleanSupplier.getAsBoolean() && System.currentTimeMillis() <= this.stopTime;
            }
        };
    }

    public void close() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(this.senders.size() + this.receivers.size());
        this.context.runOnContext(r5 -> {
            this.senders.forEach(protonSender -> {
                if (protonSender.isOpen()) {
                    protonSender.closeHandler(asyncResult -> {
                        countDownLatch.countDown();
                    }).close();
                } else {
                    countDownLatch.countDown();
                }
            });
            this.receivers.forEach(protonReceiver -> {
                if (protonReceiver.isOpen()) {
                    protonReceiver.closeHandler(asyncResult -> {
                        countDownLatch.countDown();
                    }).close();
                } else {
                    countDownLatch.countDown();
                }
            });
        });
        countDownLatch.await(30L, TimeUnit.SECONDS);
        if (this.connection == null || this.connection.isDisconnected()) {
            return;
        }
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.context.runOnContext(r52 -> {
            this.connection.closeHandler(asyncResult -> {
                countDownLatch2.countDown();
            }).close();
        });
        countDownLatch2.await(10L, TimeUnit.SECONDS);
    }
}
