/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.SocketAddress;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.neo4j.causalclustering.catchup.CatchUpChannelPool;
import org.neo4j.causalclustering.catchup.CatchUpClientChannelPipeline;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
import org.neo4j.causalclustering.catchup.TimeoutLoop;
import org.neo4j.causalclustering.catchup.TrackingResponseHandler;
import org.neo4j.causalclustering.discovery.CoreAddresses;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CatchUpClient
extends LifecycleAdapter {
    private final LogProvider logProvider;
    private final TopologyService discoveryService;
    private final Log log;
    private final Clock clock;
    private final Monitors monitors;
    private final long inactivityTimeoutMillis;
    private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<CatchUpChannel>(x$0 -> new CatchUpChannel((AdvertisedSocketAddress)x$0));
    private NioEventLoopGroup eventLoopGroup;

    public CatchUpClient(TopologyService discoveryService, LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Monitors monitors) {
        this.logProvider = logProvider;
        this.discoveryService = discoveryService;
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
        this.clock = clock;
        this.inactivityTimeoutMillis = inactivityTimeoutMillis;
        this.monitors = monitors;
    }

    public <T> T makeBlockingRequest(MemberId target, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler) throws CatchUpClientException {
        CompletableFuture future = new CompletableFuture();
        Optional<AdvertisedSocketAddress> catchUpAddress = this.discoveryService.coreServers().find(target).map(CoreAddresses::getCatchupServer);
        CatchUpChannel channel = this.pool.acquire(catchUpAddress.orElseThrow(() -> new CatchUpClientException("Cannot find the target member socket address")));
        future.whenComplete((result, e) -> {
            if (e == null) {
                this.pool.release(channel);
            } else {
                this.pool.dispose(channel);
            }
        });
        channel.setResponseHandler(responseHandler, future);
        channel.send(request);
        String operation = String.format("Timed out executing operation %s on %s (%s)", request, target, catchUpAddress.get());
        return TimeoutLoop.waitForCompletion(future, operation, channel::millisSinceLastResponse, this.inactivityTimeoutMillis, this.log);
    }

    public void start() {
        this.eventLoopGroup = new NioEventLoopGroup(0, (ThreadFactory)new NamedThreadFactory("catch-up-client"));
    }

    public void stop() throws Throwable {
        this.log.info("CatchUpClient stopping");
        try {
            this.pool.close();
            this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
        }
        catch (InterruptedException e) {
            this.log.warn("Interrupted while stopping catch up client.");
        }
    }

    private class CatchUpChannel
    implements CatchUpChannelPool.Channel {
        private final TrackingResponseHandler handler;
        private final AdvertisedSocketAddress destination;
        private Channel nettyChannel;

        CatchUpChannel(AdvertisedSocketAddress destination) {
            this.destination = destination;
            this.handler = new TrackingResponseHandler(new CatchUpResponseAdaptor(), CatchUpClient.this.clock);
            Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)CatchUpClient.this.eventLoopGroup)).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

                protected void initChannel(SocketChannel ch) throws Exception {
                    CatchUpClientChannelPipeline.initChannel(ch, CatchUpChannel.this.handler, CatchUpClient.this.logProvider, CatchUpClient.this.monitors);
                }
            });
            ChannelFuture channelFuture = bootstrap.connect((SocketAddress)destination.socketAddress());
            this.nettyChannel = channelFuture.awaitUninterruptibly().channel();
        }

        void setResponseHandler(CatchUpResponseCallback responseHandler, CompletableFuture<?> requestOutcomeSignal) {
            this.handler.setResponseHandler(responseHandler, requestOutcomeSignal);
        }

        void send(CatchUpRequest request) {
            this.nettyChannel.write((Object)request.messageType());
            this.nettyChannel.writeAndFlush((Object)request);
        }

        long millisSinceLastResponse() {
            return CatchUpClient.this.clock.millis() - this.handler.lastResponseTime();
        }

        @Override
        public AdvertisedSocketAddress destination() {
            return this.destination;
        }

        @Override
        public void close() {
            this.nettyChannel.close();
        }
    }
}

