package io.scalecube.transport;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.scalecube.transport.utils.ChannelFutureUtils;
import io.scalecube.transport.utils.memoization.Computable;
import io.scalecube.transport.utils.memoization.Memoizer;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

/* loaded from: input_file:io/scalecube/transport/Transport.class */
public final class Transport implements ITransportSpi, ITransport {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
    private static final Function<TransportHandshakeData, TransportEndpoint> HANDSHAKE_DATA_TO_ENDPOINT_FUNCTION = new Function<TransportHandshakeData, TransportEndpoint>() { // from class: io.scalecube.transport.Transport.1
        public TransportEndpoint apply(TransportHandshakeData transportHandshakeData) {
            return transportHandshakeData.endpoint();
        }
    };
    private final TransportEndpoint localEndpoint;
    private final TransportSettings settings;
    private final EventLoopGroup eventLoop;
    private final EventExecutorGroup eventExecutor;
    private final Subject<Message, Message> incomingMessagesSubject = PublishSubject.create();
    private final ConcurrentMap<TransportAddress, TransportChannel> acceptedChannels = new ConcurrentHashMap();
    private final Memoizer<TransportAddress, TransportChannel> connectedChannels = new Memoizer<>();
    private PipelineFactory pipelineFactory;
    private ServerChannel serverChannel;

    private Transport(TransportEndpoint transportEndpoint, TransportSettings transportSettings, EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutorGroup) {
        Preconditions.checkArgument(transportEndpoint != null);
        Preconditions.checkArgument(transportSettings != null);
        Preconditions.checkArgument(eventLoopGroup != null);
        Preconditions.checkArgument(eventExecutorGroup != null);
        this.localEndpoint = transportEndpoint;
        this.settings = transportSettings;
        this.eventLoop = eventLoopGroup;
        this.eventExecutor = eventExecutorGroup;
        this.pipelineFactory = new TransportPipelineFactory(this, new ProtostuffProtocol(), transportSettings.isUseNetworkEmulator());
    }

    public static Transport newInstance(TransportEndpoint transportEndpoint) {
        return newInstance(transportEndpoint, TransportSettings.DEFAULT);
    }

    public static Transport newInstance(TransportEndpoint transportEndpoint, TransportSettings transportSettings) {
        return newInstance(transportEndpoint, transportSettings, defaultEventLoop(transportEndpoint), defaultEventExecutor(transportEndpoint));
    }

    public static Transport newInstance(TransportEndpoint transportEndpoint, EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutorGroup) {
        return newInstance(transportEndpoint, TransportSettings.DEFAULT, eventLoopGroup, eventExecutorGroup);
    }

    public static Transport newInstance(TransportEndpoint transportEndpoint, TransportSettings transportSettings, EventLoopGroup eventLoopGroup, EventExecutorGroup eventExecutorGroup) {
        return new Transport(transportEndpoint, transportSettings, eventLoopGroup, eventExecutorGroup);
    }

    private static EventLoopGroup defaultEventLoop(TransportEndpoint transportEndpoint) {
        return new NioEventLoopGroup(1, createThreadFactory("scalecube-transport-io-%s@" + transportEndpoint));
    }

    private static EventExecutorGroup defaultEventExecutor(TransportEndpoint transportEndpoint) {
        return new DefaultEventExecutorGroup(1, createThreadFactory("scalecube-transport-exec-%s@" + transportEndpoint));
    }

    private static ThreadFactory createThreadFactory(String str) {
        return new ThreadFactoryBuilder().setNameFormat(str).setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: io.scalecube.transport.Transport.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                Transport.LOGGER.error("Unhandled exception: {}", th, th);
            }
        }).setDaemon(true).build();
    }

    @Override // io.scalecube.transport.ITransport
    public TransportEndpoint localEndpoint() {
        return this.localEndpoint;
    }

    public EventLoopGroup getEventLoop() {
        return this.eventLoop;
    }

    @Override // io.scalecube.transport.ITransportSpi
    public EventExecutorGroup getEventExecutor() {
        return this.eventExecutor;
    }

    @Override // io.scalecube.transport.ITransportSpi
    public final int getHandshakeTimeout() {
        return this.settings.getHandshakeTimeout();
    }

    @Override // io.scalecube.transport.ITransportSpi
    public int getSendHighWaterMark() {
        return this.settings.getSendHighWaterMark();
    }

    @Override // io.scalecube.transport.ITransportSpi
    public LogLevel getLogLevel() {
        String logLevel = this.settings.getLogLevel();
        if (logLevel == null || logLevel.equals(TransportSettings.DEFAULT_LOG_LEVEL)) {
            return null;
        }
        return LogLevel.valueOf(logLevel);
    }

    public <T extends PipelineFactory> T getPipelineFactory() {
        return (T) this.pipelineFactory;
    }

    @Override // io.scalecube.transport.ITransport
    public final ListenableFuture<Void> start() {
        this.incomingMessagesSubject.subscribeOn(Schedulers.from(this.eventExecutor));
        final InetSocketAddress inetSocketAddress = new InetSocketAddress(this.localEndpoint.address().port());
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.eventLoop).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() { // from class: io.scalecube.transport.Transport.3
            protected void initChannel(Channel channel) {
                Transport.this.pipelineFactory.setAcceptorPipeline(channel, Transport.this);
            }
        });
        ChannelFuture bind = serverBootstrap.bind(inetSocketAddress);
        final SettableFuture create = SettableFuture.create();
        bind.addListener(new ChannelFutureListener() { // from class: io.scalecube.transport.Transport.4
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (!channelFuture.isSuccess()) {
                    Throwable cause = channelFuture.cause();
                    create.setException(cause);
                    Transport.LOGGER.error("Failed to bind to: " + inetSocketAddress + ", caught " + cause, cause);
                } else {
                    Transport.this.serverChannel = channelFuture.channel();
                    Transport.LOGGER.info("Transport endpoint '{}' bound to: {}", Transport.this.localEndpoint.id(), inetSocketAddress);
                    create.set((Object) null);
                }
            }
        });
        return create;
    }

    @Override // io.scalecube.transport.ITransport
    public ListenableFuture<TransportEndpoint> connect(@CheckForNull TransportAddress transportAddress) {
        Preconditions.checkArgument(transportAddress != null);
        return Futures.transform(getOrConnect(transportAddress).handshakeFuture(), HANDSHAKE_DATA_TO_ENDPOINT_FUNCTION);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect(final Channel channel, final TransportAddress transportAddress, final TransportChannel transportChannel) {
        channel.eventLoop().execute(new Runnable() { // from class: io.scalecube.transport.Transport.5
            @Override // java.lang.Runnable
            public void run() {
                InetSocketAddress inetSocketAddress = new InetSocketAddress(transportAddress.hostAddress(), transportAddress.port());
                ChannelPromise newPromise = channel.newPromise();
                channel.connect(inetSocketAddress, newPromise);
                newPromise.addListener(ChannelFutureUtils.wrap(new ChannelFutureListener() { // from class: io.scalecube.transport.Transport.5.1
                    public void operationComplete(ChannelFuture channelFuture) {
                        if (channelFuture.isSuccess()) {
                            return;
                        }
                        transportChannel.close(channelFuture.cause());
                    }
                }));
            }
        });
    }

    @Override // io.scalecube.transport.ITransport
    public void disconnect(@CheckForNull TransportEndpoint transportEndpoint, @Nullable SettableFuture<Void> settableFuture) {
        Preconditions.checkArgument(transportEndpoint != null);
        TransportChannel ifExists = this.connectedChannels.getIfExists(transportEndpoint.address());
        if (ifExists != null) {
            ifExists.close(settableFuture);
        } else if (settableFuture != null) {
            settableFuture.set((Object) null);
        }
    }

    @Override // io.scalecube.transport.ITransport
    public void send(@CheckForNull TransportEndpoint transportEndpoint, @CheckForNull Message message) {
        send(transportEndpoint, message, null);
    }

    @Override // io.scalecube.transport.ITransport
    public void send(@CheckForNull TransportEndpoint transportEndpoint, @CheckForNull Message message, @Nullable SettableFuture<Void> settableFuture) {
        Preconditions.checkArgument(transportEndpoint != null);
        Preconditions.checkArgument(message != null);
        getOrConnect(transportEndpoint.address()).send(message, settableFuture);
    }

    @Override // io.scalecube.transport.ITransport
    @Nonnull
    public final Observable<Message> listen() {
        return this.incomingMessagesSubject;
    }

    @Override // io.scalecube.transport.ITransport
    public final void stop() {
        stop(null);
    }

    @Override // io.scalecube.transport.ITransport
    public final void stop(@Nullable SettableFuture<Void> settableFuture) {
        try {
            this.incomingMessagesSubject.onCompleted();
        } catch (Exception e) {
        }
        Iterator<TransportAddress> it = this.acceptedChannels.keySet().iterator();
        while (it.hasNext()) {
            TransportChannel remove = this.acceptedChannels.remove(it.next());
            if (remove != null) {
                remove.close();
            }
        }
        Iterator<TransportAddress> it2 = this.connectedChannels.keySet().iterator();
        while (it2.hasNext()) {
            TransportChannel remove2 = this.connectedChannels.remove(it2.next());
            if (remove2 != null) {
                remove2.close();
            }
        }
        if (this.serverChannel != null) {
            ChannelFutureUtils.setPromise(this.serverChannel.close(), settableFuture);
        }
    }

    @Override // io.scalecube.transport.ITransportSpi
    public TransportChannel createAcceptorTransportChannel(Channel channel) {
        return TransportChannel.newAcceptorChannel(channel, new Func1<TransportChannel, Void>() { // from class: io.scalecube.transport.Transport.6
            public Void call(TransportChannel transportChannel) {
                TransportEndpoint remoteEndpoint = transportChannel.remoteEndpoint();
                if (remoteEndpoint == null) {
                    return null;
                }
                Transport.this.acceptedChannels.remove(remoteEndpoint.address());
                return null;
            }
        });
    }

    @Override // io.scalecube.transport.ITransportSpi
    public void accept(TransportChannel transportChannel) throws TransportBrokenException {
        TransportEndpoint remoteEndpoint = transportChannel.remoteEndpoint();
        Preconditions.checkNotNull(remoteEndpoint);
        Preconditions.checkNotNull(remoteEndpoint.address());
        TransportChannel putIfAbsent = this.acceptedChannels.putIfAbsent(remoteEndpoint.address(), transportChannel);
        if (putIfAbsent != null) {
            throw new TransportBrokenException(String.format("Detected duplicate %s for key=%s in accepted_map", putIfAbsent, remoteEndpoint));
        }
    }

    @Override // io.scalecube.transport.ITransportSpi
    public void resetDueHandshake(Channel channel) {
        this.pipelineFactory.resetDueHandshake(channel, this);
    }

    @Override // io.scalecube.transport.ITransportSpi
    public void onMessage(Message message) {
        this.incomingMessagesSubject.onNext(message);
    }

    private TransportChannel getOrConnect(@CheckForNull TransportAddress transportAddress) {
        Preconditions.checkArgument(transportAddress != null);
        return this.connectedChannels.get(transportAddress, new Computable<TransportAddress, TransportChannel>() { // from class: io.scalecube.transport.Transport.7
            @Override // io.scalecube.transport.utils.memoization.Computable
            public TransportChannel compute(final TransportAddress transportAddress2) {
                final Channel createConnectorChannel = Transport.this.createConnectorChannel();
                final TransportChannel createConnectorTransportChannel = Transport.this.createConnectorTransportChannel(createConnectorChannel, transportAddress2);
                Transport.LOGGER.info("Registered connector: {}", createConnectorTransportChannel);
                Transport.this.eventLoop.register(createConnectorChannel).addListener(new ChannelFutureListener() { // from class: io.scalecube.transport.Transport.7.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            Transport.this.connect(createConnectorChannel, transportAddress2, createConnectorTransportChannel);
                        } else {
                            createConnectorChannel.unsafe().closeForcibly();
                            createConnectorTransportChannel.close();
                        }
                    }
                });
                return createConnectorTransportChannel;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Channel createConnectorChannel() {
        Channel nioSocketChannel = new NioSocketChannel();
        this.pipelineFactory.setConnectorPipeline(nioSocketChannel, this);
        nioSocketChannel.config().setOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.settings.getConnectTimeout()));
        nioSocketChannel.config().setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        nioSocketChannel.config().setOption(ChannelOption.TCP_NODELAY, true);
        nioSocketChannel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
        nioSocketChannel.config().setOption(ChannelOption.SO_REUSEADDR, true);
        return nioSocketChannel;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public TransportChannel createConnectorTransportChannel(Channel channel, final TransportAddress transportAddress) {
        return TransportChannel.newConnectorChannel(channel, new Func1<TransportChannel, Void>() { // from class: io.scalecube.transport.Transport.8
            public Void call(TransportChannel transportChannel) {
                Transport.this.connectedChannels.remove(transportAddress);
                return null;
            }
        });
    }
}
