/*
 * Decompiled with CFR 0.152.
 */
package reactor.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Method;
import java.io.IOException;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ChannelCloseHandlers;
import reactor.rabbitmq.ChannelProxy;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.RabbitFluxException;
import reactor.rabbitmq.ResourceManagementOptions;
import reactor.rabbitmq.RpcClient;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.SenderOptions;
import reactor.rabbitmq.SubscriberState;
import reactor.util.context.Context;

public class Sender
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private static final Function<Connection, Channel> CHANNEL_PROXY_CREATION_FUNCTION = new ChannelProxyCreationFunction();
    private final Mono<? extends Connection> connectionMono;
    private final Mono<? extends Channel> channelMono;
    private final BiConsumer<SignalType, Channel> channelCloseHandler;
    private final AtomicBoolean hasConnection = new AtomicBoolean(false);
    private final Mono<? extends Channel> resourceManagementChannelMono;
    private final Scheduler resourceManagementScheduler;
    private final boolean privateResourceManagementScheduler;
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;
    private final ExecutorService channelCloseThreadPool = Executors.newCachedThreadPool();

    public Sender() {
        this(new SenderOptions());
    }

    public Sender(SenderOptions options) {
        this.privateConnectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null;
        this.connectionSubscriptionScheduler = options.getConnectionSubscriptionScheduler() == null ? this.createScheduler("rabbitmq-sender-connection-subscription") : options.getConnectionSubscriptionScheduler();
        this.connectionMono = options.getConnectionMono() != null ? options.getConnectionMono() : Mono.fromCallable(() -> options.getConnectionFactory().newConnection()).doOnSubscribe(c -> this.hasConnection.set(true)).subscribeOn(this.connectionSubscriptionScheduler).cache();
        this.channelMono = options.getChannelMono();
        this.channelCloseHandler = options.getChannelCloseHandler() == null ? ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE : options.getChannelCloseHandler();
        this.privateResourceManagementScheduler = options.getResourceManagementScheduler() == null;
        this.resourceManagementScheduler = options.getResourceManagementScheduler() == null ? this.createScheduler("rabbitmq-sender-resource-creation") : options.getResourceManagementScheduler();
        this.resourceManagementChannelMono = options.getResourceManagementChannelMono() == null ? this.connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).cache() : options.getResourceManagementChannelMono();
    }

    protected Scheduler createScheduler(String name) {
        return Schedulers.newElastic((String)name);
    }

    public Mono<Void> send(Publisher<OutboundMessage> messages) {
        return this.send(messages, new SendOptions());
    }

    public Mono<Void> send(Publisher<OutboundMessage> messages, SendOptions options) {
        options = options == null ? new SendOptions() : options;
        Mono<? extends Channel> currentChannelMono = this.getChannelMono(options);
        BiConsumer<SendContext, Exception> exceptionHandler = options.getExceptionHandler();
        BiConsumer<SignalType, Channel> channelCloseHandler = this.getChannelCloseHandler(options);
        return currentChannelMono.flatMapMany(channel -> Flux.from((Publisher)messages).doOnNext(message -> {
            try {
                channel.basicPublish(message.getExchange(), message.getRoutingKey(), message.getProperties(), message.getBody());
            }
            catch (Exception e) {
                exceptionHandler.accept(new SendContext((Channel)channel, (OutboundMessage)message), e);
            }
        }).doOnError(e -> LOGGER.warn("Send failed with exception {}", e)).doFinally(st -> channelCloseHandler.accept((SignalType)st, (Channel)channel))).then();
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> messages) {
        return this.sendWithPublishConfirms(messages, new SendOptions());
    }

    public Flux<OutboundMessageResult> sendWithPublishConfirms(Publisher<OutboundMessage> messages, SendOptions options) {
        SendOptions sendOptions = options == null ? new SendOptions() : options;
        Mono<? extends Channel> currentChannelMono = this.getChannelMono(options);
        BiConsumer<SignalType, Channel> channelCloseHandler = this.getChannelCloseHandler(options);
        return currentChannelMono.map(channel -> {
            try {
                channel.confirmSelect();
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while setting publisher confirms on channel", e);
            }
            return channel;
        }).flatMapMany(channel -> Flux.from((Publisher)new PublishConfirmOperator(messages, (Channel)channel, sendOptions)).doFinally(signalType -> {
            if (signalType == SignalType.ON_ERROR) {
                channelCloseHandler.accept((SignalType)signalType, (Channel)channel);
            } else {
                this.channelCloseThreadPool.execute(() -> channelCloseHandler.accept((SignalType)signalType, (Channel)channel));
            }
        }));
    }

    Mono<? extends Channel> getChannelMono(SendOptions options) {
        return Stream.of(options.getChannelMono(), this.channelMono).filter(Objects::nonNull).findFirst().orElse(this.connectionMono.map(CHANNEL_CREATION_FUNCTION));
    }

    private BiConsumer<SignalType, Channel> getChannelCloseHandler(SendOptions options) {
        return options.getChannelCloseHandler() != null ? options.getChannelCloseHandler() : this.channelCloseHandler;
    }

    public RpcClient rpcClient(String exchange, String routingKey) {
        return new RpcClient((Mono<Channel>)this.connectionMono.map(CHANNEL_CREATION_FUNCTION).cache(), exchange, routingKey);
    }

    public RpcClient rpcClient(String exchange, String routingKey, Supplier<String> correlationIdProvider) {
        return new RpcClient((Mono<Channel>)this.connectionMono.map(CHANNEL_CREATION_FUNCTION).cache(), exchange, routingKey, correlationIdProvider);
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification) {
        return this.declareQueue(specification, null);
    }

    public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification, ResourceManagementOptions options) {
        return this.declareQueue(specification, options);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification) {
        return this.declareQueue(specification, null);
    }

    public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Declare declare = specification.getName() == null ? new AMQP.Queue.Declare.Builder().queue("").durable(false).exclusive(true).autoDelete(true).arguments(specification.getArguments()).build() : new AMQP.Queue.Declare.Builder().queue(specification.getName()).durable(specification.isDurable()).exclusive(specification.isExclusive()).autoDelete(specification.isAutoDelete()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)declare);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.DeclareOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    private Mono<? extends Channel> getChannelMonoForResourceManagement(ResourceManagementOptions options) {
        return options != null && options.getChannelMono() != null ? options.getChannelMono() : this.resourceManagementChannelMono;
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification) {
        return this.delete(specification, false, false);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, ResourceManagementOptions options) {
        return this.delete(specification, false, false, options);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, boolean ifUnused, boolean ifEmpty) {
        return this.deleteQueue(specification, ifUnused, ifEmpty);
    }

    public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, boolean ifUnused, boolean ifEmpty, ResourceManagementOptions options) {
        return this.deleteQueue(specification, ifUnused, ifEmpty, options);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, boolean ifUnused, boolean ifEmpty) {
        return this.deleteQueue(specification, ifUnused, ifEmpty, null);
    }

    public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, boolean ifUnused, boolean ifEmpty, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Delete delete = new AMQP.Queue.Delete.Builder().queue(specification.getName()).ifUnused(ifUnused).ifEmpty(ifEmpty).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)delete);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.DeleteOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification specification) {
        return this.declareExchange(specification, null);
    }

    public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification specification, ResourceManagementOptions options) {
        return this.declareExchange(specification, options);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification specification) {
        return this.declareExchange(specification, null);
    }

    public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification specification, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Exchange.Declare declare = new AMQP.Exchange.Declare.Builder().exchange(specification.getName()).type(specification.getType()).durable(specification.isDurable()).autoDelete(specification.isAutoDelete()).internal(specification.isInternal()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)declare);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Exchange.DeclareOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification) {
        return this.delete(specification, false);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, ResourceManagementOptions options) {
        return this.delete(specification, false, options);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, boolean ifUnused) {
        return this.deleteExchange(specification, ifUnused);
    }

    public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, boolean ifUnused, ResourceManagementOptions options) {
        return this.deleteExchange(specification, ifUnused, options);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specification, boolean ifUnused) {
        return this.deleteExchange(specification, ifUnused, null);
    }

    public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specification, boolean ifUnused, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Exchange.Delete delete = new AMQP.Exchange.Delete.Builder().exchange(specification.getName()).ifUnused(ifUnused).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)delete);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Exchange.DeleteOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification specification) {
        return this.unbind(specification, null);
    }

    public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification specification, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Unbind unbinding = new AMQP.Queue.Unbind.Builder().exchange(specification.getExchange()).queue(specification.getQueue()).routingKey(specification.getRoutingKey()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)unbinding);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.UnbindOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification specification) {
        return this.bind(specification, null);
    }

    public Mono<AMQP.Queue.BindOk> bind(BindingSpecification specification, ResourceManagementOptions options) {
        Mono<? extends Channel> channelMono = this.getChannelMonoForResourceManagement(options);
        AMQP.Queue.Bind binding = new AMQP.Queue.Bind.Builder().exchange(specification.getExchange()).queue(specification.getQueue()).routingKey(specification.getRoutingKey()).arguments(specification.getArguments()).build();
        return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc((Method)binding);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage((CompletionStage)future)).flatMap(command -> Mono.just((Object)((AMQP.Queue.BindOk)command.getMethod()))).publishOn(this.resourceManagementScheduler);
    }

    @Override
    public void close() {
        if (this.hasConnection.getAndSet(false)) {
            try {
                ((Connection)this.connectionMono.block()).close();
            }
            catch (IOException e) {
                throw new RabbitFluxException(e);
            }
        }
        if (this.privateConnectionSubscriptionScheduler) {
            this.connectionSubscriptionScheduler.dispose();
        }
        if (this.privateResourceManagementScheduler) {
            this.resourceManagementScheduler.dispose();
        }
        this.channelCloseThreadPool.shutdown();
    }

    private static class ChannelProxyCreationFunction
    implements Function<Connection, Channel> {
        private ChannelProxyCreationFunction() {
        }

        @Override
        public Channel apply(Connection connection) {
            try {
                return new ChannelProxy(connection);
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    private static class ChannelCreationFunction
    implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            }
            catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    private static class PublishConfirmSubscriber
    implements CoreSubscriber<OutboundMessage> {
        private final AtomicReference<SubscriberState> state = new AtomicReference<SubscriberState>(SubscriberState.INIT);
        private final AtomicReference<Throwable> firstException = new AtomicReference();
        private final ConcurrentNavigableMap<Long, OutboundMessage> unconfirmed = new ConcurrentSkipListMap<Long, OutboundMessage>();
        private final Channel channel;
        private final Subscriber<? super OutboundMessageResult> subscriber;
        private final BiConsumer<SendContext, Exception> exceptionHandler;

        private PublishConfirmSubscriber(Channel channel, Subscriber<? super OutboundMessageResult> subscriber, SendOptions options) {
            this.channel = channel;
            this.subscriber = subscriber;
            this.exceptionHandler = options.getExceptionHandler();
        }

        public void onSubscribe(Subscription subscription) {
            this.channel.addConfirmListener(new ConfirmListener(){

                public void handleAck(long deliveryTag, boolean multiple) {
                    this.handleAckNack(deliveryTag, multiple, true);
                }

                public void handleNack(long deliveryTag, boolean multiple) {
                    this.handleAckNack(deliveryTag, multiple, false);
                }

                private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
                    if (multiple) {
                        try {
                            NavigableMap unconfirmedToSend = unconfirmed.headMap((Object)deliveryTag, true);
                            Iterator iterator = unconfirmedToSend.entrySet().iterator();
                            while (iterator.hasNext()) {
                                subscriber.onNext((Object)new OutboundMessageResult((OutboundMessage)iterator.next().getValue(), ack));
                                iterator.remove();
                            }
                        }
                        catch (Exception e) {
                            this.handleError(e, null);
                        }
                    } else {
                        OutboundMessage outboundMessage = (OutboundMessage)unconfirmed.get(deliveryTag);
                        try {
                            unconfirmed.remove(deliveryTag);
                            subscriber.onNext((Object)new OutboundMessageResult(outboundMessage, ack));
                        }
                        catch (Exception e) {
                            this.handleError(e, new OutboundMessageResult(outboundMessage, ack));
                        }
                    }
                    if (unconfirmed.size() == 0) {
                        this.maybeComplete();
                    }
                }
            });
            this.state.set(SubscriberState.ACTIVE);
            this.subscriber.onSubscribe(subscription);
        }

        public void onNext(OutboundMessage message) {
            if (this.checkComplete(message)) {
                return;
            }
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                this.unconfirmed.putIfAbsent(nextPublishSeqNo, message);
                this.channel.basicPublish(message.getExchange(), message.getRoutingKey(), message.getProperties(), message.getBody());
            }
            catch (Exception e) {
                this.unconfirmed.remove(nextPublishSeqNo);
                try {
                    this.exceptionHandler.accept(new ConfirmSendContext(this.channel, message, this), e);
                }
                catch (Exception innerException) {
                    this.handleError(innerException, new OutboundMessageResult(message, false));
                }
            }
        }

        public void onError(Throwable throwable) {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) || this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.subscriber.onError(throwable);
            } else if (this.firstException.compareAndSet(null, throwable) && this.state.get() == SubscriberState.COMPLETE) {
                Operators.onErrorDropped((Throwable)throwable, (Context)this.currentContext());
            }
        }

        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.unconfirmed.size() == 0) {
                this.maybeComplete();
            }
        }

        private void handleError(Exception e, OutboundMessageResult result) {
            LOGGER.error("error in publish confirm sending", (Throwable)e);
            boolean complete = this.checkComplete(e);
            this.firstException.compareAndSet(null, e);
            if (!complete) {
                if (result != null) {
                    this.subscriber.onNext((Object)result);
                }
                this.onError(e);
            }
        }

        private void maybeComplete() {
            boolean done = this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE);
            if (done) {
                this.subscriber.onComplete();
            }
        }

        public <T> boolean checkComplete(T t) {
            boolean complete;
            boolean bl = complete = this.state.get() == SubscriberState.COMPLETE;
            if (complete && this.firstException.get() == null) {
                Operators.onNextDropped(t, (Context)this.currentContext());
            }
            return complete;
        }
    }

    private static class PublishConfirmOperator
    extends FluxOperator<OutboundMessage, OutboundMessageResult> {
        private final Channel channel;
        private final SendOptions options;

        public PublishConfirmOperator(Publisher<OutboundMessage> source, Channel channel, SendOptions options) {
            super(Flux.from(source));
            this.channel = channel;
            this.options = options;
        }

        public void subscribe(CoreSubscriber<? super OutboundMessageResult> actual) {
            this.source.subscribe((CoreSubscriber)new PublishConfirmSubscriber(this.channel, (Subscriber)actual, this.options));
        }
    }

    public static class ConfirmSendContext
    extends SendContext {
        private final PublishConfirmSubscriber subscriber;

        public ConfirmSendContext(Channel channel, OutboundMessage message, PublishConfirmSubscriber subscriber) {
            super(channel, message);
            this.subscriber = subscriber;
        }

        @Override
        public void publish(OutboundMessage outboundMessage) throws Exception {
            long nextPublishSeqNo = this.channel.getNextPublishSeqNo();
            try {
                this.subscriber.unconfirmed.putIfAbsent(nextPublishSeqNo, this.message);
                super.publish(outboundMessage);
            }
            catch (Exception e) {
                this.subscriber.unconfirmed.remove(nextPublishSeqNo);
                throw e;
            }
        }

        @Override
        public void publish() throws Exception {
            this.publish(this.getMessage());
        }
    }

    public static class SendContext {
        protected final Channel channel;
        protected final OutboundMessage message;

        public SendContext(Channel channel, OutboundMessage message) {
            this.channel = channel;
            this.message = message;
        }

        public OutboundMessage getMessage() {
            return this.message;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public void publish(OutboundMessage outboundMessage) throws Exception {
            this.channel.basicPublish(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody());
        }

        public void publish() throws Exception {
            this.publish(this.getMessage());
        }
    }
}

