/*
 * Decompiled with CFR 0.152.
 */
package reactor.rabbitmq;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Helpers;
import reactor.rabbitmq.RabbitFluxException;
import reactor.rabbitmq.ReceiverOptions;
import reactor.rabbitmq.Utils;

public class Receiver
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private final Mono<? extends Connection> connectionMono;
    private final AtomicReference<Connection> connection = new AtomicReference();
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;
    private final int connectionClosingTimeout;
    private final AtomicBoolean closingOrClosed = new AtomicBoolean(false);

    public Receiver() {
        this(new ReceiverOptions());
    }

    public Receiver(ReceiverOptions options) {
        Mono cm;
        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
        Scheduler scheduler = this.connectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null ? this.createScheduler("rabbitmq-receiver-connection-subscription") : options.getConnectionSubscriptionScheduler();
        if (options.getConnectionMono() == null) {
            cm = Mono.fromCallable(() -> {
                if (options.getConnectionSupplier() == null) {
                    return options.getConnectionFactory().newConnection();
                }
                return options.getConnectionSupplier().apply(null);
            });
            cm = options.getConnectionMonoConfigurator().apply((Mono<? extends Connection>)cm);
            cm = cm.doOnNext(conn -> this.connection.set((Connection)conn)).subscribeOn(this.connectionSubscriptionScheduler).transform(this::cache);
        } else {
            cm = options.getConnectionMono();
        }
        this.connectionMono = cm;
        this.connectionClosingTimeout = options.getConnectionClosingTimeout() != null && !Duration.ZERO.equals(options.getConnectionClosingTimeout()) ? (int)options.getConnectionClosingTimeout().toMillis() : -1;
    }

    protected <T> Mono<T> cache(Mono<T> mono) {
        return Utils.cache(mono);
    }

    protected Scheduler createScheduler(String name) {
        return Schedulers.newElastic((String)name);
    }

    public Flux<Delivery> consumeNoAck(String queue) {
        return this.consumeNoAck(queue, new ConsumeOptions());
    }

    public Flux<Delivery> consumeNoAck(String queue, ConsumeOptions options) {
        return Flux.create(emitter -> this.connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
            try {
                if (options.getChannelCallback() != null) {
                    options.getChannelCallback().accept((Channel)channel);
                }
                DeliverCallback deliverCallback = (consumerTag, message) -> {
                    emitter.next((Object)message);
                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), (Delivery)message).booleanValue()) {
                        emitter.complete();
                    }
                };
                AtomicBoolean basicCancel = new AtomicBoolean(true);
                CancelCallback cancelCallback = consumerTag -> {
                    LOGGER.info("Flux consumer {} has been cancelled", (Object)consumerTag);
                    basicCancel.set(false);
                    emitter.complete();
                };
                this.completeOnChannelShutdown((Channel)channel, (FluxSink<?>)emitter);
                String consumerTag2 = channel.basicConsume(queue, true, options.getConsumerTag(), deliverCallback, cancelCallback);
                AtomicBoolean cancelled = new AtomicBoolean(false);
                LOGGER.info("Consumer {} consuming from {} has been registered", (Object)consumerTag2, (Object)queue);
                emitter.onDispose(() -> {
                    LOGGER.info("Cancelling consumer {} consuming from {}", (Object)consumerTag2, (Object)queue);
                    if (cancelled.compareAndSet(false, true)) {
                        try {
                            if (channel.isOpen() && channel.getConnection().isOpen()) {
                                if (basicCancel.compareAndSet(true, false)) {
                                    channel.basicCancel(consumerTag2);
                                }
                                channel.close();
                            }
                        }
                        catch (IOException | TimeoutException e) {
                            LOGGER.warn("Error while closing channel: " + e.getMessage());
                        }
                    }
                });
            }
            catch (Exception e) {
                emitter.error((Throwable)new RabbitFluxException(e));
            }
        }, arg_0 -> ((FluxSink)emitter).error(arg_0)), (FluxSink.OverflowStrategy)options.getOverflowStrategy());
    }

    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
        channel.addShutdownListener(reason -> {
            if (this.isRecoverable(channel)) {
                if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
                    emitter.complete();
                }
            } else {
                emitter.complete();
            }
        });
    }

    public Flux<Delivery> consumeAutoAck(String queue) {
        return this.consumeAutoAck(queue, new ConsumeOptions());
    }

    public Flux<Delivery> consumeAutoAck(String queue, ConsumeOptions options) {
        return this.consumeManualAck(queue, options).doOnNext(AcknowledgableDelivery::ack).map(ackableMsg -> ackableMsg);
    }

    public Flux<AcknowledgableDelivery> consumeManualAck(String queue) {
        return this.consumeManualAck(queue, new ConsumeOptions());
    }

    public Flux<AcknowledgableDelivery> consumeManualAck(String queue, ConsumeOptions options) {
        return Flux.create(emitter -> this.connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
            try {
                if (options.getChannelCallback() != null) {
                    options.getChannelCallback().accept((Channel)channel);
                }
                if (options.getQos() != 0) {
                    channel.basicQos(options.getQos());
                }
                DeliverCallback deliverCallback = (consumerTag, message) -> {
                    AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, (Channel)channel, options.getExceptionHandler());
                    if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(), delivery).booleanValue()) {
                        emitter.next((Object)delivery);
                    }
                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), (Delivery)message).booleanValue()) {
                        emitter.complete();
                    }
                };
                AtomicBoolean basicCancel = new AtomicBoolean(true);
                CancelCallback cancelCallback = consumerTag -> {
                    LOGGER.info("Flux consumer {} has been cancelled", (Object)consumerTag);
                    basicCancel.set(false);
                    emitter.complete();
                };
                this.completeOnChannelShutdown((Channel)channel, (FluxSink<?>)emitter);
                String consumerTag2 = channel.basicConsume(queue, false, options.getConsumerTag(), deliverCallback, cancelCallback);
                AtomicBoolean cancelled = new AtomicBoolean(false);
                LOGGER.info("Consumer {} consuming from {} has been registered", (Object)consumerTag2, (Object)queue);
                emitter.onDispose(() -> {
                    LOGGER.info("Cancelling consumer {} consuming from {}", (Object)consumerTag2, (Object)queue);
                    if (cancelled.compareAndSet(false, true)) {
                        try {
                            if (channel.isOpen() && channel.getConnection().isOpen()) {
                                if (basicCancel.compareAndSet(true, false)) {
                                    channel.basicCancel(consumerTag2);
                                }
                                channel.close();
                            }
                        }
                        catch (IOException | TimeoutException e) {
                            LOGGER.warn("Error while closing channel: " + e.getMessage());
                        }
                    }
                });
            }
            catch (Exception e) {
                emitter.error((Throwable)new RabbitFluxException(e));
            }
        }, arg_0 -> ((FluxSink)emitter).error(arg_0)), (FluxSink.OverflowStrategy)options.getOverflowStrategy());
    }

    protected boolean isRecoverable(Connection connection) {
        return Utils.isRecoverable(connection);
    }

    protected boolean isRecoverable(Channel channel) {
        return Utils.isRecoverable(channel);
    }

    @Override
    public void close() {
        if (this.closingOrClosed.compareAndSet(false, true)) {
            if (this.connection.get() != null) {
                Helpers.safelyExecute(LOGGER, () -> this.connection.get().close(this.connectionClosingTimeout), "Error while closing receiver connection");
            }
            if (this.privateConnectionSubscriptionScheduler) {
                Helpers.safelyExecute(LOGGER, () -> this.connectionSubscriptionScheduler.dispose(), "Error while disposing connection subscriber scheduler");
            }
        }
    }

    private static class ChannelCreationFunction
    implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    public static class AcknowledgmentContext {
        private final AcknowledgableDelivery delivery;
        private final Consumer<AcknowledgableDelivery> consumer;

        public AcknowledgmentContext(AcknowledgableDelivery delivery, Consumer<AcknowledgableDelivery> consumer) {
            this.delivery = delivery;
            this.consumer = consumer;
        }

        public void ackOrNack() {
            this.consumer.accept(this.delivery);
        }
    }
}

