/*
 * Decompiled with CFR 0.152.
 */
package reactor.rabbitmq;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.RabbitFluxException;
import reactor.rabbitmq.ReceiverOptions;

public class Receiver
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private final Mono<? extends Connection> connectionMono;
    private final AtomicBoolean hasConnection = new AtomicBoolean(false);
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;

    public Receiver() {
        this(new ReceiverOptions());
    }

    public Receiver(ReceiverOptions options) {
        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
        this.connectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null ? this.createScheduler("rabbitmq-receiver-connection-subscription") : options.getConnectionSubscriptionScheduler();
        this.connectionMono = options.getConnectionMono() != null ? options.getConnectionMono() : Mono.fromCallable(() -> options.getConnectionFactory().newConnection()).doOnSubscribe(c -> this.hasConnection.set(true)).subscribeOn(this.connectionSubscriptionScheduler).cache();
    }

    protected Scheduler createScheduler(String name) {
        return Schedulers.newElastic((String)name);
    }

    public Flux<Delivery> consumeNoAck(String queue) {
        return this.consumeNoAck(queue, new ConsumeOptions());
    }

    public Flux<Delivery> consumeNoAck(String queue, ConsumeOptions options) {
        return Flux.create(emitter -> this.connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
            try {
                DeliverCallback deliverCallback = (consumerTag, message) -> {
                    emitter.next((Object)message);
                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), (Delivery)message).booleanValue()) {
                        emitter.complete();
                    }
                };
                CancelCallback cancelCallback = consumerTag -> {
                    LOGGER.info("Flux consumer {} has been cancelled", (Object)consumerTag);
                    emitter.complete();
                };
                this.completeOnChannelShutdown((Channel)channel, (FluxSink<?>)emitter);
                String consumerTag2 = channel.basicConsume(queue, true, deliverCallback, cancelCallback);
                AtomicBoolean cancelled = new AtomicBoolean(false);
                LOGGER.info("Consumer {} consuming from {} has been registered", (Object)consumerTag2, (Object)queue);
                emitter.onDispose(() -> {
                    LOGGER.info("Cancelling consumer {} consuming from {}", (Object)consumerTag2, (Object)queue);
                    if (cancelled.compareAndSet(false, true)) {
                        try {
                            if (channel.isOpen() && channel.getConnection().isOpen()) {
                                channel.basicCancel(consumerTag2);
                                channel.close();
                            }
                        }
                        catch (IOException | TimeoutException e) {
                            LOGGER.warn("Error while closing channel: " + e.getMessage());
                        }
                    }
                });
            }
            catch (IOException e) {
                throw new RabbitFluxException(e);
            }
        }, arg_0 -> ((FluxSink)emitter).error(arg_0)), (FluxSink.OverflowStrategy)options.getOverflowStrategy());
    }

    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
        channel.addShutdownListener(reason -> {
            if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
                emitter.complete();
            }
        });
    }

    public Flux<Delivery> consumeAutoAck(String queue) {
        return this.consumeAutoAck(queue, new ConsumeOptions());
    }

    public Flux<Delivery> consumeAutoAck(String queue, ConsumeOptions options) {
        return this.consumeManualAck(queue, options).doOnNext(msg -> {
            try {
                msg.ack();
            }
            catch (Exception e) {
                options.getExceptionHandler().accept(new AcknowledgmentContext((AcknowledgableDelivery)((Object)msg)), e);
            }
        }).map(ackableMsg -> ackableMsg);
    }

    public Flux<AcknowledgableDelivery> consumeManualAck(String queue) {
        return this.consumeManualAck(queue, new ConsumeOptions());
    }

    public Flux<AcknowledgableDelivery> consumeManualAck(String queue, ConsumeOptions options) {
        return Flux.create(emitter -> this.connectionMono.map(CHANNEL_CREATION_FUNCTION).subscribe(channel -> {
            try {
                if (options.getQos() != 0) {
                    channel.basicQos(options.getQos());
                }
                DeliverCallback deliverCallback = (consumerTag, message) -> {
                    AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, (Channel)channel);
                    if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(), delivery).booleanValue()) {
                        emitter.next((Object)delivery);
                    }
                    if (options.getStopConsumingBiFunction().apply(emitter.requestedFromDownstream(), (Delivery)message).booleanValue()) {
                        emitter.complete();
                    }
                };
                CancelCallback cancelCallback = consumerTag -> {
                    LOGGER.info("Flux consumer {} has been cancelled", (Object)consumerTag);
                    emitter.complete();
                };
                this.completeOnChannelShutdown((Channel)channel, (FluxSink<?>)emitter);
                String consumerTag2 = channel.basicConsume(queue, false, deliverCallback, cancelCallback);
                AtomicBoolean cancelled = new AtomicBoolean(false);
                emitter.onDispose(() -> {
                    LOGGER.info("Cancelling consumer {} consuming from {}", (Object)consumerTag2, (Object)queue);
                    if (cancelled.compareAndSet(false, true)) {
                        try {
                            if (channel.isOpen() && channel.getConnection().isOpen()) {
                                channel.basicCancel(consumerTag2);
                                channel.close();
                            }
                        }
                        catch (IOException | TimeoutException e) {
                            LOGGER.warn("Error while closing channel: " + e.getMessage());
                        }
                    }
                });
            }
            catch (IOException e) {
                throw new RabbitFluxException(e);
            }
        }, arg_0 -> ((FluxSink)emitter).error(arg_0)), (FluxSink.OverflowStrategy)options.getOverflowStrategy());
    }

    @Override
    public void close() {
        if (this.hasConnection.getAndSet(false)) {
            try {
                ((Connection)this.connectionMono.block()).close();
            }
            catch (IOException e) {
                throw new RabbitFluxException(e);
            }
        }
        if (this.privateConnectionSubscriptionScheduler) {
            this.connectionSubscriptionScheduler.dispose();
        }
    }

    private static class ChannelCreationFunction
    implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    public static class AcknowledgmentContext {
        private final AcknowledgableDelivery delivery;

        public AcknowledgmentContext(AcknowledgableDelivery delivery) {
            this.delivery = delivery;
        }

        public AcknowledgableDelivery getDelivery() {
            return this.delivery;
        }
    }
}

