package org.apache.flink.queryablestate.network;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/queryablestate/network/Client.class */
public class Client<REQ extends MessageBody, RESP extends MessageBody> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Client.class);
    private final String clientName;
    private final Bootstrap bootstrap;
    private final MessageSerializer<REQ, RESP> messageSerializer;
    private final KvStateRequestStats stats;
    private final Map<InetSocketAddress, Client<REQ, RESP>.EstablishedConnection> establishedConnections = new ConcurrentHashMap();
    private final Map<InetSocketAddress, Client<REQ, RESP>.PendingConnection> pendingConnections = new ConcurrentHashMap();
    private final AtomicReference<CompletableFuture<Void>> clientShutdownFuture = new AtomicReference<>(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/Client$EstablishedConnection.class */
    public class EstablishedConnection implements ClientHandlerCallback<RESP> {
        private final InetSocketAddress serverAddress;
        private final Channel channel;
        private final ConcurrentHashMap<Long, Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture> pendingRequests = new ConcurrentHashMap<>();
        private final AtomicLong requestCount = new AtomicLong();
        private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture = new AtomicReference<>(null);

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/queryablestate/network/Client$EstablishedConnection$TimestampedCompletableFuture.class */
        public class TimestampedCompletableFuture extends CompletableFuture<RESP> {
            private final long timestampInNanos;

            TimestampedCompletableFuture(long j) {
                this.timestampInNanos = j;
            }

            public long getTimestamp() {
                return this.timestampInNanos;
            }
        }

        EstablishedConnection(InetSocketAddress inetSocketAddress, MessageSerializer<REQ, RESP> messageSerializer, Channel channel) {
            this.serverAddress = (InetSocketAddress) Preconditions.checkNotNull(inetSocketAddress);
            this.channel = (Channel) Preconditions.checkNotNull(channel);
            channel.pipeline().addLast(Client.this.getClientName() + " Handler", new ClientHandler(Client.this.clientName, messageSerializer, this));
            Client.this.stats.reportActiveConnection();
        }

        CompletableFuture<Void> close() {
            return close(new ClosedChannelException());
        }

        private CompletableFuture<Void> close(Throwable th) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            if (this.connectionShutdownFuture.compareAndSet(null, completableFuture)) {
                this.channel.close().addListener2(future -> {
                    Client.this.stats.reportInactiveConnection();
                    Iterator it2 = this.pendingRequests.keySet().iterator();
                    while (it2.hasNext()) {
                        Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture remove = this.pendingRequests.remove(Long.valueOf(((Long) it2.next()).longValue()));
                        if (remove != null && remove.completeExceptionally(th)) {
                            Client.this.stats.reportFailedRequest();
                        }
                    }
                    if (future.isSuccess()) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        Client.LOG.warn("Something went wrong when trying to close connection due to : ", th);
                        completableFuture.completeExceptionally(future.cause());
                    }
                });
            }
            return this.connectionShutdownFuture.get();
        }

        CompletableFuture<RESP> sendRequest(REQ req) {
            Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture remove;
            Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture timestampedCompletableFuture = new TimestampedCompletableFuture(System.nanoTime());
            try {
                long andIncrement = this.requestCount.getAndIncrement();
                this.pendingRequests.put(Long.valueOf(andIncrement), timestampedCompletableFuture);
                Client.this.stats.reportRequest();
                this.channel.writeAndFlush(MessageSerializer.serializeRequest(this.channel.alloc(), andIncrement, req)).addListener2((GenericFutureListener<? extends Future<? super Void>>) channelFuture -> {
                    Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture remove2;
                    if (channelFuture.isSuccess() || (remove2 = this.pendingRequests.remove(Long.valueOf(andIncrement))) == null || !remove2.completeExceptionally(channelFuture.cause())) {
                        return;
                    }
                    Client.this.stats.reportFailedRequest();
                });
                CompletableFuture completableFuture = (CompletableFuture) Client.this.clientShutdownFuture.get();
                if (completableFuture != null && (remove = this.pendingRequests.remove(Long.valueOf(andIncrement))) != null) {
                    completableFuture.whenComplete((r6, th) -> {
                        if (th == null || !remove.completeExceptionally(th)) {
                            remove.completeExceptionally(new ClosedChannelException());
                        } else {
                            Client.this.stats.reportFailedRequest();
                        }
                    });
                }
            } catch (Throwable th2) {
                timestampedCompletableFuture.completeExceptionally(th2);
            }
            return timestampedCompletableFuture;
        }

        @Override // org.apache.flink.queryablestate.network.ClientHandlerCallback
        public void onRequestResult(long j, RESP resp) {
            Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture remove = this.pendingRequests.remove(Long.valueOf(j));
            if (remove == null || remove.isDone()) {
                return;
            }
            Client.this.stats.reportSuccessfulRequest((System.nanoTime() - remove.getTimestamp()) / 1000000);
            remove.complete(resp);
        }

        @Override // org.apache.flink.queryablestate.network.ClientHandlerCallback
        public void onRequestFailure(long j, Throwable th) {
            Client<REQ, RESP>.EstablishedConnection.TimestampedCompletableFuture remove = this.pendingRequests.remove(Long.valueOf(j));
            if (remove == null || remove.isDone()) {
                return;
            }
            Client.this.stats.reportFailedRequest();
            remove.completeExceptionally(th);
        }

        @Override // org.apache.flink.queryablestate.network.ClientHandlerCallback
        public void onFailure(Throwable th) {
            close(th).handle((r5, th2) -> {
                return Boolean.valueOf(Client.this.establishedConnections.remove(this.serverAddress, this));
            });
        }

        public String toString() {
            return "EstablishedConnection{serverAddress=" + this.serverAddress + ", channel=" + this.channel + ", pendingRequests=" + this.pendingRequests.size() + ", requestCount=" + this.requestCount + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/queryablestate/network/Client$PendingConnection.class */
    public class PendingConnection implements ChannelFutureListener {
        private final Object connectLock;
        private final InetSocketAddress serverAddress;
        private final MessageSerializer<REQ, RESP> serializer;
        private final ArrayDeque<Client<REQ, RESP>.PendingConnection.PendingRequest> queuedRequests;
        private Client<REQ, RESP>.EstablishedConnection established;
        private final AtomicReference<CompletableFuture<Void>> connectionShutdownFuture;
        private Throwable failureCause;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/queryablestate/network/Client$PendingConnection$PendingRequest.class */
        public final class PendingRequest extends CompletableFuture<RESP> {
            private final REQ request;

            private PendingRequest(REQ req) {
                this.request = req;
            }
        }

        private PendingConnection(InetSocketAddress inetSocketAddress, MessageSerializer<REQ, RESP> messageSerializer) {
            this.connectLock = new Object();
            this.queuedRequests = new ArrayDeque<>();
            this.connectionShutdownFuture = new AtomicReference<>(null);
            this.serverAddress = inetSocketAddress;
            this.serializer = messageSerializer;
        }

        @Override // org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                handInChannel(channelFuture.channel());
            } else {
                close(channelFuture.cause());
            }
        }

        CompletableFuture<RESP> sendRequest(REQ req) {
            synchronized (this.connectLock) {
                if (this.failureCause != null) {
                    return FutureUtils.getFailedFuture(this.failureCause);
                }
                if (this.connectionShutdownFuture.get() != null) {
                    return FutureUtils.getFailedFuture(new ClosedChannelException());
                }
                if (this.established != null) {
                    return (CompletableFuture<RESP>) this.established.sendRequest(req);
                }
                Client<REQ, RESP>.PendingConnection.PendingRequest pendingRequest = new PendingRequest(req);
                this.queuedRequests.add(pendingRequest);
                return pendingRequest;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void handInChannel(Channel channel) {
            synchronized (this.connectLock) {
                if (this.connectionShutdownFuture.get() == null && this.failureCause == null) {
                    this.established = new EstablishedConnection(this.serverAddress, this.serializer, channel);
                    while (!this.queuedRequests.isEmpty()) {
                        Client<REQ, RESP>.PendingConnection.PendingRequest poll = this.queuedRequests.poll();
                        this.established.sendRequest(((PendingRequest) poll).request).whenComplete((messageBody, th) -> {
                            if (th != null) {
                                poll.completeExceptionally(th);
                            } else {
                                poll.complete(messageBody);
                            }
                        });
                    }
                    Client.this.establishedConnections.put(this.serverAddress, this.established);
                    Client.this.pendingConnections.remove(this.serverAddress);
                    if (Client.this.clientShutdownFuture.get() != null && Client.this.establishedConnections.remove(this.serverAddress, this.established)) {
                        this.established.close();
                    }
                } else {
                    channel.close();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Void> close() {
            return close(new ClosedChannelException());
        }

        private CompletableFuture<Void> close(Throwable th) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            if (this.connectionShutdownFuture.compareAndSet(null, completableFuture)) {
                synchronized (this.connectLock) {
                    if (this.failureCause == null) {
                        this.failureCause = th;
                    }
                    if (this.established != null) {
                        this.established.close().whenComplete((r4, th2) -> {
                            if (th2 != null) {
                                completableFuture.completeExceptionally(th2);
                            } else {
                                completableFuture.complete(null);
                            }
                        });
                    } else {
                        while (true) {
                            Client<REQ, RESP>.PendingConnection.PendingRequest poll = this.queuedRequests.poll();
                            if (poll == null) {
                                break;
                            }
                            poll.completeExceptionally(th);
                        }
                        completableFuture.complete(null);
                    }
                }
            }
            return this.connectionShutdownFuture.get();
        }

        public String toString() {
            String str;
            synchronized (this.connectLock) {
                str = "PendingConnection{serverAddress=" + this.serverAddress + ", queuedRequests=" + this.queuedRequests.size() + ", established=" + (this.established != null) + ", closed=" + (this.connectionShutdownFuture.get() != null) + '}';
            }
            return str;
        }
    }

    public Client(String str, int i, MessageSerializer<REQ, RESP> messageSerializer, KvStateRequestStats kvStateRequestStats) {
        Preconditions.checkArgument(i >= 1, "Non-positive number of event loop threads.");
        this.clientName = (String) Preconditions.checkNotNull(str);
        this.messageSerializer = (MessageSerializer) Preconditions.checkNotNull(messageSerializer);
        this.stats = (KvStateRequestStats) Preconditions.checkNotNull(kvStateRequestStats);
        this.bootstrap = new Bootstrap().group(new NioEventLoopGroup(i, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flink " + str + " Event Loop Thread %d").build())).channel(NioSocketChannel.class).option(ChannelOption.ALLOCATOR, new NettyBufferPool(i)).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.queryablestate.network.Client.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)).addLast(new ChunkedWriteHandler());
            }
        });
    }

    public String getClientName() {
        return this.clientName;
    }

    public CompletableFuture<RESP> sendRequest(InetSocketAddress inetSocketAddress, REQ req) {
        if (this.clientShutdownFuture.get() != null) {
            return FutureUtils.getFailedFuture(new IllegalStateException(this.clientName + " is already shut down."));
        }
        Client<REQ, RESP>.EstablishedConnection establishedConnection = this.establishedConnections.get(inetSocketAddress);
        if (establishedConnection != null) {
            return (CompletableFuture<RESP>) establishedConnection.sendRequest(req);
        }
        Client<REQ, RESP>.PendingConnection pendingConnection = this.pendingConnections.get(inetSocketAddress);
        if (pendingConnection != null) {
            return (CompletableFuture<RESP>) pendingConnection.sendRequest(req);
        }
        Client<REQ, RESP>.PendingConnection pendingConnection2 = new PendingConnection(inetSocketAddress, this.messageSerializer);
        Client<REQ, RESP>.PendingConnection putIfAbsent = this.pendingConnections.putIfAbsent(inetSocketAddress, pendingConnection2);
        if (putIfAbsent != null) {
            return (CompletableFuture<RESP>) putIfAbsent.sendRequest(req);
        }
        this.bootstrap.connect(inetSocketAddress.getAddress(), inetSocketAddress.getPort()).addListener2((GenericFutureListener<? extends Future<? super Void>>) pendingConnection2);
        return pendingConnection2.sendRequest(req);
    }

    public CompletableFuture<Void> shutdown() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (!this.clientShutdownFuture.compareAndSet(null, completableFuture)) {
            return this.clientShutdownFuture.get();
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<InetSocketAddress, Client<REQ, RESP>.EstablishedConnection> entry : this.establishedConnections.entrySet()) {
            if (this.establishedConnections.remove(entry.getKey(), entry.getValue())) {
                arrayList.add(entry.getValue().close());
            }
        }
        for (Map.Entry<InetSocketAddress, Client<REQ, RESP>.PendingConnection> entry2 : this.pendingConnections.entrySet()) {
            if (this.pendingConnections.remove(entry2.getKey()) != null) {
                arrayList.add(entry2.getValue().close());
            }
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()])).whenComplete((r9, th) -> {
            if (th != null) {
                LOG.warn("Problem while shutting down the connections at the {}: {}", this.clientName, th);
            }
            if (this.bootstrap == null) {
                completableFuture.complete(null);
                return;
            }
            EventLoopGroup group = this.bootstrap.group();
            if (group == null || group.isShutdown()) {
                completableFuture.complete(null);
            } else {
                group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).addListener2(future -> {
                    if (future.isSuccess()) {
                        completableFuture.complete(null);
                    } else {
                        completableFuture.completeExceptionally(future.cause());
                    }
                });
            }
        });
        return completableFuture;
    }

    @VisibleForTesting
    public boolean isEventGroupShutdown() {
        return this.bootstrap == null || this.bootstrap.group().isTerminated();
    }
}
