/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.protocol;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisException;
import io.lettuce.core.api.push.PushListener;
import io.lettuce.core.api.push.PushMessage;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceSets;
import io.lettuce.core.metrics.CommandLatencyRecorder;
import io.lettuce.core.output.CommandOutput;
import io.lettuce.core.output.PushOutput;
import io.lettuce.core.protocol.ByteBufferCopyCodec;
import io.lettuce.core.protocol.ChannelLogDescriptor;
import io.lettuce.core.protocol.CommandWrapper;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.DemandAware;
import io.lettuce.core.protocol.Endpoint;
import io.lettuce.core.protocol.HasQueuedCommands;
import io.lettuce.core.protocol.LatencyMeteredCommand;
import io.lettuce.core.protocol.PristineFallbackCommand;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.protocol.RedisStateMachine;
import io.lettuce.core.protocol.TracedCommand;
import io.lettuce.core.protocol.WithLatency;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.tracing.TraceContext;
import io.lettuce.core.tracing.Tracer;
import io.lettuce.core.tracing.Tracing;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.local.LocalAddress;
import io.netty.util.Recycler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class CommandHandler
extends ChannelDuplexHandler
implements HasQueuedCommands {
    static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = LettuceSets.unmodifiableSet("Connection reset by peer", "Broken pipe", "Connection timed out");
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
    private static final AtomicLong COMMAND_HANDLER_COUNTER = new AtomicLong();
    private final ClientOptions clientOptions;
    private final ClientResources clientResources;
    private final Endpoint endpoint;
    private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque();
    private final long commandHandlerId = COMMAND_HANDLER_COUNTER.incrementAndGet();
    private final boolean traceEnabled = logger.isTraceEnabled();
    private final boolean debugEnabled = logger.isDebugEnabled();
    private final CommandLatencyRecorder commandLatencyRecorder;
    private final boolean latencyMetricsEnabled;
    private final boolean tracingEnabled;
    private final DecodeBufferPolicy decodeBufferPolicy;
    private final boolean boundedQueues;
    private final BackpressureSource backpressureSource = new BackpressureSource();
    private RedisStateMachine rsm;
    private Channel channel;
    private ByteBuf buffer;
    private boolean hasDecodeProgress;
    private PushOutput<ByteBuffer, ByteBuffer> pushOutput;
    private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED;
    private String logPrefix;
    private PristineFallbackCommand fallbackCommand;
    private boolean pristine;
    private Tracing.Endpoint tracedEndpoint;

    public CommandHandler(ClientOptions clientOptions, ClientResources clientResources, Endpoint endpoint) {
        LettuceAssert.notNull((Object)clientOptions, "ClientOptions must not be null");
        LettuceAssert.notNull((Object)clientResources, "ClientResources must not be null");
        LettuceAssert.notNull((Object)endpoint, "RedisEndpoint must not be null");
        this.clientOptions = clientOptions;
        this.clientResources = clientResources;
        this.endpoint = endpoint;
        this.commandLatencyRecorder = clientResources.commandLatencyRecorder();
        this.latencyMetricsEnabled = this.commandLatencyRecorder.isEnabled();
        this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
        Tracing tracing = clientResources.tracing();
        this.tracingEnabled = tracing.isEnabled();
        this.decodeBufferPolicy = clientOptions.getDecodeBufferPolicy();
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public Queue<RedisCommand<?, ?, ?>> getStack() {
        return this.stack;
    }

    protected void setState(LifecycleState lifecycleState) {
        if (this.lifecycleState != LifecycleState.CLOSED) {
            this.lifecycleState = lifecycleState;
        }
    }

    void setBuffer(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public Collection<RedisCommand<?, ?, ?>> drainQueue() {
        return CommandHandler.drainCommands(this.stack);
    }

    protected LifecycleState getState() {
        return this.lifecycleState;
    }

    public boolean isClosed() {
        return this.lifecycleState == LifecycleState.CLOSED;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.isClosed()) {
            logger.debug("{} Dropping register for a closed channel", (Object)this.logPrefix());
        }
        this.channel = ctx.channel();
        if (this.debugEnabled) {
            this.logPrefix = null;
            logger.debug("{} channelRegistered()", (Object)this.logPrefix());
        }
        this.logPrefix = null;
        this.pristine = true;
        this.fallbackCommand = null;
        this.setState(LifecycleState.REGISTERED);
        this.buffer = ctx.alloc().buffer(65536);
        this.rsm = new RedisStateMachine(ctx.alloc());
        ctx.fireChannelRegistered();
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelUnregistered()", (Object)this.logPrefix());
        }
        if (this.channel != null && ctx.channel() != this.channel) {
            logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners", (Object)this.logPrefix());
            ctx.fireChannelUnregistered();
            return;
        }
        this.channel = null;
        this.buffer.release();
        this.rsm.close();
        this.rsm = null;
        this.reset();
        this.setState(LifecycleState.CLOSED);
        ctx.fireChannelUnregistered();
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == EnableAutoRead.INSTANCE) {
            this.channel.config().setAutoRead(true);
        } else if (evt instanceof ConnectionEvents.Reset) {
            this.reset();
        }
        super.userEventTriggered(ctx, evt);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        InternalLogLevel logLevel = InternalLogLevel.WARN;
        if (!this.stack.isEmpty()) {
            RedisCommand<?, ?, ?> command = this.stack.poll();
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in {}", (Object)this.logPrefix(), command);
            }
            logLevel = InternalLogLevel.DEBUG;
            try {
                command.completeExceptionally(cause);
            }
            catch (Exception ex) {
                logger.warn("{} Unexpected exception during command completion exceptionally: {}", new Object[]{this.logPrefix, ex.toString(), ex});
            }
        }
        if (this.channel == null || !this.channel.isActive() || !this.isConnected()) {
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in connectionError", (Object)this.logPrefix());
            }
            logLevel = InternalLogLevel.DEBUG;
            this.endpoint.notifyException(cause);
        }
        if (cause instanceof IOException && logLevel.ordinal() > InternalLogLevel.INFO.ordinal()) {
            logLevel = InternalLogLevel.INFO;
            if (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                logLevel = InternalLogLevel.DEBUG;
            }
        }
        logger.log(logLevel, "{} Unexpected exception during request: {}", new Object[]{this.logPrefix, cause.toString(), cause});
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelActive()", (Object)this.logPrefix());
        }
        this.setState(LifecycleState.CONNECTED);
        this.tracedEndpoint = this.clientResources.tracing().createEndpoint(ctx.channel().remoteAddress());
        this.endpoint.notifyChannelActive(ctx.channel());
        super.channelActive(ctx);
        if (this.debugEnabled) {
            logger.debug("{} channelActive() done", (Object)this.logPrefix());
        }
    }

    private static <T> List<T> drainCommands(Queue<T> source) {
        T cmd;
        ArrayList<T> target = new ArrayList<T>(source.size());
        while ((cmd = source.poll()) != null) {
            target.add(cmd);
        }
        return target;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelInactive()", (Object)this.logPrefix());
        }
        if (this.channel != null && ctx.channel() != this.channel) {
            logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners.", (Object)this.logPrefix());
            super.channelInactive(ctx);
            return;
        }
        this.tracedEndpoint = null;
        this.setState(LifecycleState.DISCONNECTED);
        this.setState(LifecycleState.DEACTIVATING);
        this.endpoint.notifyChannelInactive(ctx.channel());
        this.endpoint.notifyDrainQueuedCommands(this);
        this.setState(LifecycleState.DEACTIVATED);
        PristineFallbackCommand command = this.fallbackCommand;
        if (this.isProtectedMode(command)) {
            this.onProtectedMode(command.getOutput().getError());
        }
        if (this.debugEnabled) {
            logger.debug("{} channelInactive() done", (Object)this.logPrefix());
        }
        super.channelInactive(ctx);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} write(ctx, {}, promise)", (Object)this.logPrefix(), msg);
        }
        if (msg instanceof RedisCommand) {
            this.writeSingleCommand(ctx, (RedisCommand)msg, promise);
            return;
        }
        if (msg instanceof List) {
            List batch = (List)msg;
            if (batch.size() == 1) {
                this.writeSingleCommand(ctx, (RedisCommand)batch.get(0), promise);
                return;
            }
            this.writeBatch(ctx, batch, promise);
            return;
        }
        if (msg instanceof Collection) {
            this.writeBatch(ctx, (Collection)msg, promise);
        }
    }

    private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) {
        if (!CommandHandler.isWriteable(command)) {
            promise.trySuccess();
            return;
        }
        this.addToStack(command, promise);
        this.attachTracing(ctx, command);
        ctx.write(command, promise);
    }

    private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?, ?>> batch, ChannelPromise promise) {
        LinkedHashSet deduplicated = new LinkedHashSet(batch.size(), 1.0f);
        for (RedisCommand<?, ?, ?> redisCommand : batch) {
            if (!CommandHandler.isWriteable(redisCommand) || deduplicated.add(redisCommand)) continue;
            deduplicated.remove(redisCommand);
            redisCommand.completeExceptionally(new RedisException("Attempting to write duplicate command that is already enqueued: " + redisCommand));
        }
        try {
            this.validateWrite(deduplicated.size());
        }
        catch (Exception e) {
            for (RedisCommand redisCommand : deduplicated) {
                redisCommand.completeExceptionally(e);
            }
            throw e;
        }
        for (RedisCommand<Object, Object, Object> redisCommand : deduplicated) {
            this.attachTracing(ctx, redisCommand);
            this.addToStack(redisCommand, promise);
        }
        if (!deduplicated.isEmpty()) {
            ctx.write(deduplicated, promise);
        } else {
            promise.trySuccess();
        }
    }

    private void attachTracing(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command) {
        if (!this.tracingEnabled || !(command instanceof CompleteableCommand)) {
            return;
        }
        TracedCommand traced = CommandWrapper.unwrap(command, TracedCommand.class);
        TracedCommand provider = traced == null ? this.clientResources.tracing().initialTraceContextProvider() : traced;
        Tracer tracer = this.clientResources.tracing().getTracerProvider().getTracer();
        if (provider != null) {
            TraceContext context = provider.getTraceContext();
            Tracer.Span span = tracer.nextSpan(context);
            span.name(command.getType().name());
            if (this.tracedEndpoint != null) {
                span.remoteEndpoint(this.tracedEndpoint);
            } else {
                span.remoteEndpoint(this.clientResources.tracing().createEndpoint(ctx.channel().remoteAddress()));
            }
            span.start(command);
            if (traced != null) {
                traced.setSpan(span);
            }
        }
    }

    private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {
        try {
            this.validateWrite(1);
            if (command.getOutput() == null) {
                this.complete(command);
            }
            RedisCommand<?, ?, ?> redisCommand = this.potentiallyWrapLatencyCommand(command);
            if (promise.isVoid()) {
                this.stack.add(redisCommand);
            } else {
                promise.addListener((GenericFutureListener)AddToStack.newInstance(this.stack, redisCommand));
            }
        }
        catch (Exception e) {
            command.completeExceptionally(e);
            throw e;
        }
    }

    private void validateWrite(int commands) {
        if (this.usesBoundedQueues()) {
            int maxMaintenanceCommands = 5;
            int allowedRequestQueueSize = this.clientOptions.getRequestQueueSize() + maxMaintenanceCommands;
            if (this.stack.size() + commands > allowedRequestQueueSize) {
                throw new RedisException("Internal stack size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the stack size drops.");
            }
        }
    }

    private boolean usesBoundedQueues() {
        return this.boundedQueues;
    }

    private static boolean isWriteable(RedisCommand<?, ?, ?> command) {
        return !command.isDone();
    }

    private RedisCommand<?, ?, ?> potentiallyWrapLatencyCommand(RedisCommand<?, ?, ?> command) {
        if (!this.latencyMetricsEnabled) {
            return command;
        }
        if (command instanceof WithLatency) {
            WithLatency withLatency = (WithLatency)((Object)command);
            withLatency.firstResponse(-1L);
            withLatency.sent(CommandHandler.nanoTime());
            return command;
        }
        LatencyMeteredCommand latencyMeteredCommand = new LatencyMeteredCommand(command);
        latencyMeteredCommand.firstResponse(-1L);
        latencyMeteredCommand.sent(CommandHandler.nanoTime());
        return latencyMeteredCommand;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        input.touch((Object)"CommandHandler.read(\u2026)");
        if (!input.isReadable() || input.refCnt() == 0) {
            logger.warn("{} Input not readable {}, {}", new Object[]{this.logPrefix(), input.isReadable(), input.refCnt()});
            return;
        }
        if (this.debugEnabled) {
            logger.debug("{} Received: {} bytes, {} commands in the stack", new Object[]{this.logPrefix(), input.readableBytes(), this.stack.size()});
        }
        try {
            if (this.buffer.refCnt() < 1) {
                logger.warn("{} Ignoring received data for closed or abandoned connection", (Object)this.logPrefix());
                return;
            }
            if (this.debugEnabled && ctx.channel() != this.channel) {
                logger.debug("{} Ignoring data for a non-registered channel {}", (Object)this.logPrefix(), (Object)ctx.channel());
                return;
            }
            if (this.traceEnabled) {
                logger.trace("{} Buffer: {}", (Object)this.logPrefix(), (Object)input.toString(Charset.defaultCharset()).trim());
            }
            this.buffer.touch((Object)"CommandHandler.read(\u2026)");
            this.buffer.writeBytes(input);
            this.decode(ctx, this.buffer);
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        if (this.pristine && this.stack.isEmpty() && buffer.isReadable() && !this.isPushDecode(buffer)) {
            if (this.debugEnabled) {
                logger.debug("{} Received response without a command context (empty stack)", (Object)this.logPrefix());
            }
            if (this.consumeResponse(buffer)) {
                this.pristine = false;
            }
            return;
        }
        while (this.canDecode(buffer)) {
            if (this.isPushDecode(buffer)) {
                if (this.pushOutput == null) {
                    this.pushOutput = new PushOutput<ByteBuffer, ByteBuffer>(ByteBufferCopyCodec.INSTANCE);
                }
                try {
                    if (!this.decode(ctx, buffer, this.pushOutput)) {
                        this.hasDecodeProgress = true;
                        this.decodeBufferPolicy.afterPartialDecode(buffer);
                        return;
                    }
                }
                catch (Exception e) {
                    ctx.close();
                    throw e;
                }
                this.hasDecodeProgress = false;
                PushOutput<ByteBuffer, ByteBuffer> output = this.pushOutput;
                this.pushOutput = null;
                this.notifyPushListeners(output);
                continue;
            }
            RedisCommand<?, ?, ?> command = this.stack.peek();
            if (this.debugEnabled) {
                logger.debug("{} Stack contains: {} commands", (Object)this.logPrefix(), (Object)this.stack.size());
            }
            this.pristine = false;
            try {
                if (!this.decode(ctx, buffer, command)) {
                    this.hasDecodeProgress = true;
                    this.decodeBufferPolicy.afterPartialDecode(buffer);
                    return;
                }
            }
            catch (Exception e) {
                ctx.close();
                throw e;
            }
            this.hasDecodeProgress = false;
            if (this.isProtectedMode(command)) {
                this.onProtectedMode(command.getOutput().getError());
            } else if (this.canComplete(command)) {
                this.stack.poll();
                try {
                    if (this.debugEnabled) {
                        logger.debug("{} Completing command {}", (Object)this.logPrefix(), command);
                    }
                    this.complete(command);
                }
                catch (Exception e) {
                    logger.warn("{} Unexpected exception during request: {}", new Object[]{this.logPrefix, e.toString(), e});
                }
            }
            this.afterDecode(ctx, command);
        }
        this.decodeBufferPolicy.afterDecoding(buffer);
    }

    protected void notifyPushListeners(PushMessage notification) {
        Collection<PushListener> pushListeners = this.endpoint.getPushListeners();
        try {
            pushListeners.forEach(pushListener -> pushListener.onPushMessage(notification));
        }
        catch (Exception e) {
            logger.warn("PushListener.onPushMessage failed with " + e.toString(), (Throwable)e);
        }
    }

    protected boolean canDecode(ByteBuf buffer) {
        return buffer.isReadable() && (this.isMessageDecode() || this.isPushDecode(buffer));
    }

    private boolean isPushMessage(ByteBuf buffer) {
        return buffer.getByte(buffer.readerIndex()) == RedisStateMachine.State.Type.PUSH.marker;
    }

    protected boolean isPushDecode(ByteBuf buffer) {
        return !this.hasDecodeProgress && this.isPushMessage(buffer) || this.pushOutput != null;
    }

    private boolean isMessageDecode() {
        return !this.stack.isEmpty();
    }

    protected boolean canComplete(RedisCommand<?, ?, ?> command) {
        return true;
    }

    protected void complete(RedisCommand<?, ?, ?> command) {
        command.complete();
    }

    private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {
        if (this.latencyMetricsEnabled && command instanceof WithLatency) {
            WithLatency withLatency = (WithLatency)((Object)command);
            if (withLatency.getFirstResponse() == -1L) {
                withLatency.firstResponse(CommandHandler.nanoTime());
            }
            if (!this.decode0(ctx, buffer, command)) {
                return false;
            }
            this.recordLatency(withLatency, command.getType());
            return true;
        }
        return this.decode0(ctx, buffer, command);
    }

    private boolean decode(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput<?, ?, ?> output) {
        return this.decode0(ctx, buffer, output);
    }

    private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, RedisCommand<?, ?, ?> command) {
        if (!this.decode(buffer, command, this.getCommandOutput(command))) {
            if (command instanceof DemandAware.Sink) {
                DemandAware.Sink sink = (DemandAware.Sink)((Object)command);
                sink.setSource(this.backpressureSource);
                ctx.channel().config().setAutoRead(sink.hasDemand());
            }
            return false;
        }
        if (!ctx.channel().config().isAutoRead()) {
            ctx.channel().config().setAutoRead(true);
        }
        return true;
    }

    private boolean decode0(ChannelHandlerContext ctx, ByteBuf buffer, CommandOutput<?, ?, ?> pushOutput) {
        if (!this.rsm.decode(buffer, pushOutput, arg_0 -> ((ChannelHandlerContext)ctx).fireExceptionCaught(arg_0))) {
            return false;
        }
        if (!ctx.channel().config().isAutoRead()) {
            ctx.channel().config().setAutoRead(true);
        }
        return true;
    }

    protected CommandOutput<?, ?, ?> getCommandOutput(RedisCommand<?, ?, ?> command) {
        return command.getOutput();
    }

    protected boolean decode(ByteBuf buffer, CommandOutput<?, ?, ?> output) {
        return this.rsm.decode(buffer, output);
    }

    protected boolean decode(ByteBuf buffer, RedisCommand<?, ?, ?> command, CommandOutput<?, ?, ?> output) {
        return this.rsm.decode(buffer, output, command::completeExceptionally);
    }

    private boolean consumeResponse(ByteBuf buffer) {
        PristineFallbackCommand command = this.fallbackCommand;
        if (command == null || !command.isDone()) {
            if (this.debugEnabled) {
                logger.debug("{} Consuming response using FallbackCommand", (Object)this.logPrefix());
            }
            if (command == null) {
                this.fallbackCommand = command = new PristineFallbackCommand();
            }
            if (!this.decode(buffer, command.getOutput())) {
                return false;
            }
            if (this.isProtectedMode(command)) {
                this.onProtectedMode(command.getOutput().getError());
            }
        }
        return true;
    }

    private boolean isProtectedMode(RedisCommand<?, ?, ?> command) {
        return command != null && command.getOutput() != null && command.getOutput().hasError() && RedisConnectionException.isProtectedMode(command.getOutput().getError());
    }

    private void onProtectedMode(String message) {
        RedisConnectionException exception = new RedisConnectionException(message);
        this.endpoint.notifyException(exception);
        if (this.channel != null) {
            this.channel.disconnect();
        }
        this.stack.forEach((Consumer<RedisCommand<?, ?, ?>>)((Consumer<RedisCommand>)cmd -> cmd.completeExceptionally(exception)));
        this.stack.clear();
    }

    protected void afterDecode(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command) {
        this.decodeBufferPolicy.afterCommandDecoded(this.buffer);
    }

    private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
        if (withLatency != null && this.latencyMetricsEnabled && this.channel != null && this.remote() != null) {
            long firstResponseLatency = withLatency.getFirstResponse() - withLatency.getSent();
            long completionLatency = CommandHandler.nanoTime() - withLatency.getSent();
            this.commandLatencyRecorder.recordCommandLatency(this.local(), this.remote(), commandType, firstResponseLatency, completionLatency);
        }
    }

    private SocketAddress remote() {
        return this.channel.remoteAddress();
    }

    private SocketAddress local() {
        if (this.channel.localAddress() != null) {
            return this.channel.localAddress();
        }
        return LocalAddress.ANY;
    }

    boolean isConnected() {
        return this.lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal() && this.lifecycleState.ordinal() < LifecycleState.DISCONNECTED.ordinal();
    }

    private void reset() {
        this.resetInternals();
        CommandHandler.cancelCommands("Reset", CommandHandler.drainCommands(this.stack));
    }

    private void resetInternals() {
        if (this.rsm != null) {
            this.rsm.reset();
        }
        if (this.buffer.refCnt() > 0) {
            this.buffer.clear();
        }
    }

    private static void cancelCommands(String message, List<RedisCommand<?, ?, ?>> toCancel) {
        for (RedisCommand<?, ?, ?> cmd : toCancel) {
            if (cmd.getOutput() != null) {
                cmd.getOutput().setError(message);
            }
            cmd.cancel();
        }
    }

    public String getChannelId() {
        return this.channel == null ? "unknown" : ChannelLogDescriptor.getId(this.channel);
    }

    private String logPrefix() {
        String buffer;
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        this.logPrefix = buffer = "[" + ChannelLogDescriptor.logDescriptor(this.channel) + ", epid=" + this.endpoint.getId() + ", chid=0x" + this.getCommandHandlerId() + ']';
        return this.logPrefix;
    }

    private String getCommandHandlerId() {
        return Long.toHexString(this.commandHandlerId);
    }

    private static long nanoTime() {
        return System.nanoTime();
    }

    class BackpressureSource
    implements DemandAware.Source {
        BackpressureSource() {
        }

        @Override
        public void requestMore() {
            if (CommandHandler.this.isConnected() && !CommandHandler.this.isClosed() && !CommandHandler.this.channel.config().isAutoRead()) {
                CommandHandler.this.channel.pipeline().fireUserEventTriggered((Object)EnableAutoRead.INSTANCE);
            }
        }
    }

    public static enum LifecycleState {
        NOT_CONNECTED,
        REGISTERED,
        CONNECTED,
        ACTIVATING,
        ACTIVE,
        DISCONNECTED,
        DEACTIVATING,
        DEACTIVATED,
        CLOSED;

    }

    static enum EnableAutoRead {
        INSTANCE;

    }

    static class AddToStack
    implements GenericFutureListener<Future<Void>> {
        private static final Recycler<AddToStack> RECYCLER = new Recycler<AddToStack>(){

            protected AddToStack newObject(Recycler.Handle<AddToStack> handle) {
                return new AddToStack(handle);
            }
        };
        private final Recycler.Handle<AddToStack> handle;
        private ArrayDeque<Object> stack;
        private RedisCommand<?, ?, ?> command;

        AddToStack(Recycler.Handle<AddToStack> handle) {
            this.handle = handle;
        }

        static AddToStack newInstance(ArrayDeque<?> stack, RedisCommand<?, ?, ?> command) {
            AddToStack entry = (AddToStack)RECYCLER.get();
            entry.stack = stack;
            entry.command = command;
            return entry;
        }

        public void operationComplete(Future<Void> future) {
            try {
                if (future.isSuccess()) {
                    this.stack.add(this.command);
                }
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.stack = null;
            this.command = null;
            this.handle.recycle((Object)this);
        }
    }
}

