/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.protocol.handshake;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.neo4j.causalclustering.helper.ExponentialBackoffStrategy;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.messaging.ReconnectingChannel;
import org.neo4j.causalclustering.messaging.SimpleNettyChannel;
import org.neo4j.causalclustering.protocol.ClientNettyPipelineBuilder;
import org.neo4j.causalclustering.protocol.NettyPipelineBuilderFactory;
import org.neo4j.causalclustering.protocol.ProtocolInstaller;
import org.neo4j.causalclustering.protocol.ProtocolInstallerRepository;
import org.neo4j.causalclustering.protocol.handshake.ApplicationProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.ClientMessageDecoder;
import org.neo4j.causalclustering.protocol.handshake.ClientMessageEncoder;
import org.neo4j.causalclustering.protocol.handshake.GateEvent;
import org.neo4j.causalclustering.protocol.handshake.HandshakeClient;
import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository;
import org.neo4j.causalclustering.protocol.handshake.NettyHandshakeClient;
import org.neo4j.causalclustering.protocol.handshake.ProtocolStack;
import org.neo4j.causalclustering.protocol.handshake.ServerMessage;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class HandshakeClientInitializer
extends ChannelInitializer<SocketChannel> {
    private final ApplicationProtocolRepository applicationProtocolRepository;
    private final ModifierProtocolRepository modifierProtocolRepository;
    private final Duration timeout;
    private final ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstaller;
    private final NettyPipelineBuilderFactory pipelineBuilderFactory;
    private final TimeoutStrategy handshakeDelay;
    private final Log debugLog;
    private final Log userLog;

    public HandshakeClientInitializer(ApplicationProtocolRepository applicationProtocolRepository, ModifierProtocolRepository modifierProtocolRepository, ProtocolInstallerRepository<ProtocolInstaller.Orientation.Client> protocolInstallerRepository, NettyPipelineBuilderFactory pipelineBuilderFactory, Duration handshakeTimeout, LogProvider debugLogProvider, LogProvider userLogProvider) {
        this.debugLog = debugLogProvider.getLog(((Object)((Object)this)).getClass());
        this.userLog = userLogProvider.getLog(((Object)((Object)this)).getClass());
        this.applicationProtocolRepository = applicationProtocolRepository;
        this.modifierProtocolRepository = modifierProtocolRepository;
        this.timeout = handshakeTimeout;
        this.protocolInstaller = protocolInstallerRepository;
        this.pipelineBuilderFactory = pipelineBuilderFactory;
        this.handshakeDelay = new ExponentialBackoffStrategy(1L, 2000L, TimeUnit.MILLISECONDS);
    }

    private void installHandlers(Channel channel, HandshakeClient handshakeClient) throws Exception {
        ((ClientNettyPipelineBuilder)((ClientNettyPipelineBuilder)((ClientNettyPipelineBuilder)((ClientNettyPipelineBuilder)this.pipelineBuilderFactory.client(channel, this.debugLog).addFraming().add("handshake_client_encoder", new ChannelHandler[]{new ClientMessageEncoder()})).add("handshake_client_decoder", new ChannelHandler[]{new ClientMessageDecoder()})).add("handshake_client", new ChannelHandler[]{new NettyHandshakeClient(handshakeClient)})).addGate(msg -> !(msg instanceof ServerMessage))).install();
    }

    protected void initChannel(SocketChannel channel) throws Exception {
        HandshakeClient handshakeClient = new HandshakeClient();
        this.installHandlers((Channel)channel, handshakeClient);
        this.debugLog.info("Scheduling handshake (and timeout) local %s remote %s", new Object[]{channel.localAddress(), channel.remoteAddress()});
        this.scheduleHandshake(channel, handshakeClient, this.handshakeDelay.newTimeout());
        this.scheduleTimeout(channel, handshakeClient);
    }

    private void scheduleHandshake(SocketChannel ch, HandshakeClient handshakeClient, TimeoutStrategy.Timeout handshakeDelay) {
        ch.eventLoop().schedule(() -> {
            if (ch.isActive()) {
                this.initiateHandshake((Channel)ch, handshakeClient);
            } else if (ch.isOpen()) {
                handshakeDelay.increment();
                this.scheduleHandshake(ch, handshakeClient, handshakeDelay);
            } else {
                handshakeClient.failIfNotDone("Channel closed");
            }
        }, handshakeDelay.getMillis(), TimeUnit.MILLISECONDS);
    }

    private void scheduleTimeout(SocketChannel ch, HandshakeClient handshakeClient) {
        ch.eventLoop().schedule(() -> {
            if (handshakeClient.failIfNotDone("Timed out after " + this.timeout)) {
                this.debugLog.warn("Failed handshake after timeout");
            }
        }, this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void initiateHandshake(Channel channel, HandshakeClient handshakeClient) {
        this.debugLog.info("Initiating handshake local %s remote %s", new Object[]{channel.localAddress(), channel.remoteAddress()});
        SimpleNettyChannel channelWrapper = new SimpleNettyChannel(channel, this.debugLog);
        CompletableFuture<ProtocolStack> handshake = handshakeClient.initiate(channelWrapper, this.applicationProtocolRepository, this.modifierProtocolRepository);
        handshake.whenComplete((protocolStack, failure) -> this.onHandshakeComplete((ProtocolStack)protocolStack, channel, (Throwable)failure));
    }

    private void onHandshakeComplete(ProtocolStack protocolStack, Channel channel, Throwable failure) {
        if (failure != null) {
            this.debugLog.error("Error when negotiating protocol stack", failure);
            channel.pipeline().fireUserEventTriggered((Object)GateEvent.getFailure());
            channel.close();
        } else {
            try {
                this.userLog(protocolStack, channel);
                this.debugLog.info("Installing " + protocolStack);
                this.protocolInstaller.installerFor(protocolStack).install(channel);
                channel.attr(ReconnectingChannel.PROTOCOL_STACK_KEY).set((Object)protocolStack);
                channel.pipeline().fireUserEventTriggered((Object)GateEvent.getSuccess());
                channel.flush();
            }
            catch (Exception e) {
                this.debugLog.error("Error installing pipeline", (Throwable)e);
                channel.close();
            }
        }
    }

    private void userLog(ProtocolStack protocolStack, Channel channel) {
        this.userLog.info(String.format("Connected to %s [%s]", channel.remoteAddress(), protocolStack));
        channel.closeFuture().addListener(f -> this.userLog.info(String.format("Lost connection to %s [%s]", channel.remoteAddress(), protocolStack)));
    }
}

