package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.8.0-rc-202106062208.jar:org/apache/pulsar/client/impl/ConnectionPool.class */
public class ConnectionPool implements Closeable {
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    private final ClientConfigurationData clientConfig;
    private final EventLoopGroup eventLoopGroup;
    private final int maxConnectionsPerHosts;
    private final boolean isSniProxy;
    protected final DnsNameResolver dnsResolver;
    private static final Random random = new Random();
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectionPool.class);

    public ConnectionPool(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this(clientConfigurationData, eventLoopGroup, () -> {
            return new ClientCnx(clientConfigurationData, eventLoopGroup);
        });
    }

    public ConnectionPool(ClientConfigurationData clientConfigurationData, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> supplier) throws PulsarClientException {
        this.eventLoopGroup = eventLoopGroup;
        this.clientConfig = clientConfigurationData;
        this.maxConnectionsPerHosts = clientConfigurationData.getConnectionsPerBroker();
        this.isSniProxy = this.clientConfig.isUseTls() && this.clientConfig.getProxyProtocol() != null && StringUtils.isNotBlank(this.clientConfig.getProxyServiceUrl());
        this.pool = new ConcurrentHashMap<>();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup);
        this.bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(clientConfigurationData.getConnectionTimeoutMs()));
        this.bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.valueOf(clientConfigurationData.isUseTcpNoDelay()));
        this.bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        try {
            this.channelInitializerHandler = new PulsarChannelInitializer(clientConfigurationData, supplier);
            this.bootstrap.handler(this.channelInitializerHandler);
            this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
        } catch (Exception e) {
            log.error("Failed to create channel initializer");
            throw new PulsarClientException(e);
        }
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress inetSocketAddress) {
        return getConnection(inetSocketAddress, inetSocketAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAllConnections() {
        this.pool.values().forEach(concurrentMap -> {
            concurrentMap.values().forEach(completableFuture -> {
                if (!completableFuture.isDone()) {
                    completableFuture.thenAccept((v0) -> {
                        v0.close();
                    });
                } else {
                    if (completableFuture.isCompletedExceptionally()) {
                        return;
                    }
                    ((ClientCnx) completableFuture.join()).close();
                }
            });
        });
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        if (this.maxConnectionsPerHosts == 0) {
            return createConnection(inetSocketAddress, inetSocketAddress2, -1);
        }
        int signSafeMod = MathUtils.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts);
        return this.pool.computeIfAbsent(inetSocketAddress, inetSocketAddress3 -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(signSafeMod), num -> {
            return createConnection(inetSocketAddress, inetSocketAddress2, signSafeMod);
        });
    }

    private CompletableFuture<ClientCnx> createConnection(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, int i) {
        if (log.isDebugEnabled()) {
            log.debug("Connection for {} not found in cache", inetSocketAddress);
        }
        CompletableFuture<ClientCnx> completableFuture = new CompletableFuture<>();
        createConnection(inetSocketAddress2).thenAccept(channel -> {
            log.info("[{}] Connected to server", channel);
            channel.closeFuture().addListener2(future -> {
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", future);
                }
                cleanupConnection(inetSocketAddress, i, completableFuture);
            });
            ClientCnx clientCnx = (ClientCnx) channel.pipeline().get("handler");
            if (!channel.isActive() || clientCnx == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection was already closed by the time we got notified", channel);
                }
                completableFuture.completeExceptionally(new ChannelException("Connection already closed"));
            } else {
                if (!inetSocketAddress.equals(inetSocketAddress2)) {
                    clientCnx.setTargetBroker(inetSocketAddress);
                }
                clientCnx.setRemoteHostName(inetSocketAddress2.getHostName());
                clientCnx.connectionFuture().thenRun(() -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Connection handshake completed", clientCnx.channel());
                    }
                    completableFuture.complete(clientCnx);
                }).exceptionally(th -> {
                    log.warn("[{}] Connection handshake failed: {}", clientCnx.channel(), th.getMessage());
                    completableFuture.completeExceptionally(th);
                    cleanupConnection(inetSocketAddress, i, completableFuture);
                    clientCnx.ctx().close();
                    return null;
                });
            }
        }).exceptionally(th -> {
            this.eventLoopGroup.execute(() -> {
                log.warn("Failed to open connection to {} : {}", inetSocketAddress2, th.getMessage());
                cleanupConnection(inetSocketAddress, i, completableFuture);
                completableFuture.completeExceptionally(new PulsarClientException(th));
            });
            return null;
        });
        return completableFuture;
    }

    private CompletableFuture<Channel> createConnection(InetSocketAddress inetSocketAddress) {
        int port;
        CompletableFuture<List<InetAddress>> resolveName;
        try {
            if (this.isSniProxy) {
                URI uri = new URI(this.clientConfig.getProxyServiceUrl());
                port = uri.getPort();
                resolveName = resolveName(uri.getHost());
            } else {
                port = inetSocketAddress.getPort();
                resolveName = resolveName(inetSocketAddress.getHostString());
            }
            int i = port;
            return resolveName.thenCompose(list -> {
                return connectToResolvedAddresses(list.iterator(), i, this.isSniProxy ? inetSocketAddress : null);
            });
        } catch (URISyntaxException e) {
            log.error("Invalid Proxy url {}", this.clientConfig.getProxyServiceUrl(), e);
            return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL("Invalid url " + this.clientConfig.getProxyServiceUrl(), e));
        }
    }

    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> it, int i, InetSocketAddress inetSocketAddress) {
        CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
        CompletableFuture<Channel> connectToAddress = connectToAddress(it.next(), i, inetSocketAddress);
        completableFuture.getClass();
        connectToAddress.thenAccept((v1) -> {
            r1.complete(v1);
        }).exceptionally(th -> {
            if (!it.hasNext()) {
                completableFuture.completeExceptionally(th);
                return null;
            }
            CompletableFuture<Channel> connectToResolvedAddresses = connectToResolvedAddresses(it, i, inetSocketAddress);
            completableFuture.getClass();
            connectToResolvedAddresses.thenAccept((v1) -> {
                r1.complete(v1);
            }).exceptionally(th -> {
                completableFuture.completeExceptionally(th);
                return null;
            });
            return null;
        });
        return completableFuture;
    }

    @VisibleForTesting
    CompletableFuture<List<InetAddress>> resolveName(String str) {
        CompletableFuture<List<InetAddress>> completableFuture = new CompletableFuture<>();
        this.dnsResolver.resolveAll(str).addListener2(future -> {
            if (future.isSuccess()) {
                completableFuture.complete(future.get());
            } else {
                completableFuture.completeExceptionally(future.cause());
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Channel> connectToAddress(InetAddress inetAddress, int i, InetSocketAddress inetSocketAddress) {
        InetSocketAddress inetSocketAddress2 = new InetSocketAddress(inetAddress, i);
        return this.clientConfig.isUseTls() ? ChannelFutures.toCompletableFuture(this.bootstrap.register()).thenCompose(channel -> {
            return this.channelInitializerHandler.initTls(channel, inetSocketAddress != null ? inetSocketAddress : inetSocketAddress2);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) channel2 -> {
            return ChannelFutures.toCompletableFuture(channel2.connect(inetSocketAddress2));
        }) : ChannelFutures.toCompletableFuture(this.bootstrap.connect(inetSocketAddress2));
    }

    public void releaseConnection(ClientCnx clientCnx) {
        if (this.maxConnectionsPerHosts == 0 && clientCnx.channel().isActive()) {
            if (log.isDebugEnabled()) {
                log.debug("close connection due to pooling disabled.");
            }
            clientCnx.close();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (!this.eventLoopGroup.isShutdown()) {
                this.eventLoopGroup.shutdownGracefully(0L, 10L, TimeUnit.SECONDS).await2();
            }
        } catch (InterruptedException e) {
            log.warn("EventLoopGroup shutdown was interrupted", (Throwable) e);
        }
        this.dnsResolver.close();
    }

    private void cleanupConnection(InetSocketAddress inetSocketAddress, int i, CompletableFuture<ClientCnx> completableFuture) {
        ConcurrentMap<Integer, CompletableFuture<ClientCnx>> concurrentMap = this.pool.get(inetSocketAddress);
        if (concurrentMap != null) {
            concurrentMap.remove(Integer.valueOf(i), completableFuture);
        }
    }

    @VisibleForTesting
    int getPoolSize() {
        return this.pool.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum();
    }
}
