package org.apache.ignite.internal.network.netty;

import io.netty.bootstrap.Bootstrap;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite/internal/network/netty/ConnectionManager.class */
public class ConnectionManager {
    private static final NetworkMessagesFactory FACTORY = new NetworkMessagesFactory();
    private static final IgniteLogger LOG = Loggers.forClass(ConnectionManager.class);
    public static final byte DIRECT_PROTOCOL_VERSION = 1;
    private final Bootstrap clientBootstrap;
    private final NettyServer server;
    private final Map<String, NettySender> channels;
    private final Map<SocketAddress, NettyClient> clients;
    private final SerializationService serializationService;
    private final List<Consumer<InNetworkObject>> listeners;
    private final String consistentId;
    private final UUID launchId;
    private final RecoveryClientHandhakeManagerFactory clientHandhakeManagerFactory;
    private final AtomicBoolean started;
    private final AtomicBoolean stopped;
    private final RecoveryDescriptorProvider descriptorProvider;

    /* loaded from: input_file:org/apache/ignite/internal/network/netty/ConnectionManager$DefaultRecoveryClientHandhakeManagerFactory.class */
    private static class DefaultRecoveryClientHandhakeManagerFactory implements RecoveryClientHandhakeManagerFactory {
        private DefaultRecoveryClientHandhakeManagerFactory() {
        }

        @Override // org.apache.ignite.internal.network.recovery.RecoveryClientHandhakeManagerFactory
        public RecoveryClientHandshakeManager create(UUID uuid, String str, short s, RecoveryDescriptorProvider recoveryDescriptorProvider) {
            return new RecoveryClientHandshakeManager(uuid, str, s, recoveryDescriptorProvider);
        }
    }

    public ConnectionManager(NetworkView networkView, SerializationService serializationService, UUID uuid, String str, NettyBootstrapFactory nettyBootstrapFactory) {
        this(networkView, serializationService, uuid, str, nettyBootstrapFactory, new DefaultRecoveryClientHandhakeManagerFactory());
    }

    public ConnectionManager(NetworkView networkView, SerializationService serializationService, UUID uuid, String str, NettyBootstrapFactory nettyBootstrapFactory, RecoveryClientHandhakeManagerFactory recoveryClientHandhakeManagerFactory) {
        this.channels = new ConcurrentHashMap();
        this.clients = new ConcurrentHashMap();
        this.listeners = new CopyOnWriteArrayList();
        this.started = new AtomicBoolean(false);
        this.stopped = new AtomicBoolean(false);
        this.descriptorProvider = new DefaultRecoveryDescriptorProvider();
        this.serializationService = serializationService;
        this.launchId = uuid;
        this.consistentId = str;
        this.clientHandhakeManagerFactory = recoveryClientHandhakeManagerFactory;
        this.server = new NettyServer(networkView, this::createServerHandshakeManager, this::onNewIncomingChannel, this::onMessage, serializationService, nettyBootstrapFactory);
        this.clientBootstrap = nettyBootstrapFactory.createClientBootstrap();
    }

    public void start() throws IgniteInternalException {
        try {
            if (this.started.getAndSet(true)) {
                throw new IgniteInternalException("Attempted to start an already started connection manager");
            }
            if (this.stopped.get()) {
                throw new IgniteInternalException("Attempted to start an already stopped connection manager");
            }
            this.server.start().get();
            LOG.info("Server started [address={}]", new Object[]{this.server.address()});
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IgniteInternalException("Interrupted while starting the connection manager", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
        }
    }

    public SocketAddress getLocalAddress() {
        return this.server.address();
    }

    public OrderingFuture<NettySender> channel(@Nullable String str, SocketAddress socketAddress) {
        NettySender compute;
        return (str == null || (compute = this.channels.compute(str, (str2, nettySender) -> {
            if (nettySender == null || !nettySender.isOpen()) {
                return null;
            }
            return nettySender;
        })) == null) ? this.clients.compute(socketAddress, (socketAddress2, nettyClient) -> {
            return (nettyClient == null || nettyClient.failedToConnect() || nettyClient.isDisconnected()) ? connect(socketAddress2, (short) 0) : nettyClient;
        }).sender() : OrderingFuture.completedFuture(compute);
    }

    private void onMessage(InNetworkObject inNetworkObject) {
        this.listeners.forEach(consumer -> {
            consumer.accept(inNetworkObject);
        });
    }

    private void onNewIncomingChannel(NettySender nettySender) {
        NettySender put = this.channels.put(nettySender.consistentId(), nettySender);
        if (put != null) {
            put.close();
        }
    }

    private NettyClient connect(SocketAddress socketAddress, short s) {
        NettyClient nettyClient = new NettyClient(socketAddress, this.serializationService, createClientHandshakeManager(s), this::onMessage);
        nettyClient.start(this.clientBootstrap).whenComplete((nettySender, th) -> {
            if (th == null) {
                this.channels.put(nettySender.consistentId(), nettySender);
            } else {
                this.clients.remove(socketAddress);
            }
        });
        return nettyClient;
    }

    public void addListener(Consumer<InNetworkObject> consumer) {
        this.listeners.add(consumer);
    }

    public void stop() {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) Stream.concat(Stream.concat(this.clients.values().stream().map((v0) -> {
                return v0.stop();
            }), Stream.of(this.server.stop())), this.channels.values().stream().map((v0) -> {
                return v0.closeAsync();
            })).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
        } catch (Exception e) {
            LOG.warn("Failed to stop connection manager [reason={}]", new Object[]{e.getMessage()});
        }
    }

    public boolean isStopped() {
        return this.stopped.get();
    }

    private HandshakeManager createClientHandshakeManager(short s) {
        return this.clientHandhakeManagerFactory.create(this.launchId, this.consistentId, s, this.descriptorProvider);
    }

    private HandshakeManager createServerHandshakeManager() {
        return new RecoveryServerHandshakeManager(this.launchId, this.consistentId, FACTORY, this.descriptorProvider);
    }

    @TestOnly
    public NettyServer server() {
        return this.server;
    }

    public String consistentId() {
        return this.consistentId;
    }

    @TestOnly
    public Collection<NettyClient> clients() {
        return Collections.unmodifiableCollection(this.clients.values());
    }

    @TestOnly
    public Map<String, NettySender> channels() {
        return Collections.unmodifiableMap(this.channels);
    }
}
