/*
 * Decompiled with CFR 0.152.
 */
package reactor.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.ChannelCloseHandlers;
import reactor.rabbitmq.ChannelPool;
import reactor.rabbitmq.ChannelPoolOptions;
import reactor.rabbitmq.RabbitFluxException;

class LazyChannelPool
implements ChannelPool {
    private static final int DEFAULT_CHANNEL_POOL_SIZE = 5;
    private final Mono<? extends Connection> connectionMono;
    private final BlockingQueue<Channel> channelsQueue;
    private final Scheduler subscriptionScheduler;

    LazyChannelPool(Mono<? extends Connection> connectionMono, ChannelPoolOptions channelPoolOptions) {
        int channelsQueueCapacity = channelPoolOptions.getMaxCacheSize() == null ? 5 : channelPoolOptions.getMaxCacheSize();
        this.channelsQueue = new LinkedBlockingQueue<Channel>(channelsQueueCapacity);
        this.connectionMono = connectionMono;
        this.subscriptionScheduler = channelPoolOptions.getSubscriptionScheduler() == null ? Schedulers.newElastic((String)"sender-channel-pool") : channelPoolOptions.getSubscriptionScheduler();
    }

    @Override
    public Mono<? extends Channel> getChannelMono() {
        return this.connectionMono.map(connection -> {
            Channel channel = (Channel)this.channelsQueue.poll();
            if (channel == null) {
                channel = this.createChannel((Connection)connection);
            }
            return channel;
        }).subscribeOn(this.subscriptionScheduler);
    }

    @Override
    public BiConsumer<SignalType, Channel> getChannelCloseHandler() {
        return (signalType, channel) -> {
            boolean offer;
            if (!channel.isOpen()) {
                return;
            }
            boolean bl = offer = signalType == SignalType.ON_COMPLETE && this.channelsQueue.offer((Channel)channel);
            if (!offer) {
                ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept((SignalType)signalType, (Channel)channel);
            }
        };
    }

    @Override
    public void close() {
        ArrayList channels = new ArrayList();
        this.channelsQueue.drainTo(channels);
        channels.forEach(channel -> ChannelCloseHandlers.SENDER_CHANNEL_CLOSE_HANDLER_INSTANCE.accept(SignalType.ON_COMPLETE, (Channel)channel));
    }

    private Channel createChannel(Connection connection) {
        try {
            return connection.createChannel();
        }
        catch (IOException e) {
            throw new RabbitFluxException("Error while creating channel", e);
        }
    }
}

