package io.scalecube.transport;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.ScheduledFuture;
import io.scalecube.transport.TransportChannel;
import io.scalecube.transport.utils.ChannelFutureUtils;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/scalecube/transport/ConnectorHandshakeChannelHandler.class */
public final class ConnectorHandshakeChannelHandler extends ChannelDuplexHandler {
    static final Logger LOGGER = LoggerFactory.getLogger(ConnectorHandshakeChannelHandler.class);
    private final ITransportSpi transportSpi;
    private final Queue<WriteAndFlush> sendMailbox;
    private ScheduledFuture handshakeTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/scalecube/transport/ConnectorHandshakeChannelHandler$WriteAndFlush.class */
    public static class WriteAndFlush {
        final Object msg;
        final ChannelPromise promise;

        WriteAndFlush(Object obj, ChannelPromise channelPromise) {
            this.msg = obj;
            this.promise = channelPromise;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectorHandshakeChannelHandler(ITransportSpi iTransportSpi) {
        this.transportSpi = iTransportSpi;
        this.sendMailbox = new ArrayBlockingQueue(iTransportSpi.getSendHighWaterMark(), true);
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        final TransportChannel from = TransportChannel.from(channelHandlerContext.channel());
        from.flip(TransportChannel.Status.CONNECT_IN_PROGRESS, TransportChannel.Status.CONNECTED);
        final TransportHandshakeData create = TransportHandshakeData.create(this.transportSpi.localEndpoint());
        this.handshakeTimeout = from.channel().eventLoop().schedule(new Runnable() { // from class: io.scalecube.transport.ConnectorHandshakeChannelHandler.1
            @Override // java.lang.Runnable
            public void run() {
                ConnectorHandshakeChannelHandler.LOGGER.debug("HANDSHAKE_SYNC({}) timeout, connector: {}", create, from);
                from.close(new TransportHandshakeException("Handshake timeout on " + from, new TimeoutException()));
            }
        }, this.transportSpi.getHandshakeTimeout(), TimeUnit.MILLISECONDS);
        from.flip(TransportChannel.Status.CONNECTED, TransportChannel.Status.HANDSHAKE_IN_PROGRESS);
        channelHandlerContext.writeAndFlush(new Message(create, TransportHeaders.QUALIFIER, TransportHandshakeData.Q_TRANSPORT_HANDSHAKE_SYNC)).addListener(ChannelFutureUtils.wrap(new ChannelFutureListener() { // from class: io.scalecube.transport.ConnectorHandshakeChannelHandler.2
            public void operationComplete(ChannelFuture channelFuture) {
                if (channelFuture.isSuccess()) {
                    return;
                }
                ConnectorHandshakeChannelHandler.LOGGER.debug("HANDSHAKE_SYNC({}) not sent, connector: {}", create, from);
                ConnectorHandshakeChannelHandler.this.cancelHandshakeTimeout();
                from.close(new TransportHandshakeException("Failed to send handshake to " + from, channelFuture.cause()));
            }
        }));
        super.channelActive(channelHandlerContext);
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        TransportChannel from = TransportChannel.from(channelHandlerContext.channel());
        if (from.getCause() != null) {
            channelPromise.setFailure(from.getCause());
        } else {
            if (this.sendMailbox.offer(new WriteAndFlush(obj, channelPromise)) || channelPromise == null) {
                return;
            }
            channelPromise.setFailure(new TransportMessageException("Failed to send message " + obj + ". Mailbox is full (capacity=" + this.transportSpi.getSendHighWaterMark() + ", size=" + this.sendMailbox.size() + ")"));
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        Message message = (Message) obj;
        if (!TransportHandshakeData.Q_TRANSPORT_HANDSHAKE_SYNC_ACK.equals(message.header(TransportHeaders.QUALIFIER))) {
            throw new TransportBrokenException("Received unsupported " + obj + " (though expecting only Q_TRANSPORT_HANDSHAKE_SYNC_ACK)");
        }
        TransportHandshakeData transportHandshakeData = (TransportHandshakeData) message.data();
        TransportChannel from = TransportChannel.from(channelHandlerContext.channel());
        if (!transportHandshakeData.isResolvedOk()) {
            LOGGER.info("HANDSHAKE({}) not passed, connector: {}", transportHandshakeData, from);
            cancelHandshakeTimeout();
            from.close(new TransportHandshakeException(transportHandshakeData.explain()));
            return;
        }
        cancelHandshakeTimeout();
        from.setHandshakeData(transportHandshakeData);
        this.transportSpi.resetDueHandshake(from.channel());
        from.flip(TransportChannel.Status.HANDSHAKE_IN_PROGRESS, TransportChannel.Status.HANDSHAKE_PASSED);
        LOGGER.info("HANDSHAKE passed on connector: {}", from);
        writeAndFlushSendMailbox(channelHandlerContext);
        from.flip(TransportChannel.Status.HANDSHAKE_PASSED, TransportChannel.Status.READY);
        LOGGER.info("Set READY on connector: {}", from);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        cancelHandshakeTimeout();
        cleanupSendMailbox(channelHandlerContext);
        super.channelInactive(channelHandlerContext);
    }

    public void close(ChannelHandlerContext channelHandlerContext, ChannelPromise channelPromise) throws Exception {
        cancelHandshakeTimeout();
        cleanupSendMailbox(channelHandlerContext);
        super.close(channelHandlerContext, channelPromise);
    }

    private void writeAndFlushSendMailbox(ChannelHandlerContext channelHandlerContext) {
        while (!this.sendMailbox.isEmpty()) {
            WriteAndFlush poll = this.sendMailbox.poll();
            channelHandlerContext.writeAndFlush(poll.msg, poll.promise);
        }
    }

    private void cleanupSendMailbox(ChannelHandlerContext channelHandlerContext) {
        Throwable cause = TransportChannel.from(channelHandlerContext.channel()).getCause();
        Throwable transportClosedException = cause != null ? cause : new TransportClosedException();
        while (!this.sendMailbox.isEmpty()) {
            WriteAndFlush poll = this.sendMailbox.poll();
            if (poll.promise != null) {
                poll.promise.setFailure(transportClosedException);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelHandshakeTimeout() {
        if (this.handshakeTimeout != null) {
            this.handshakeTimeout.cancel(true);
        }
    }
}
