/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.hotrod.impl.transport.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.Signal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.infinispan.commons.util.Util;
import org.infinispan.hotrod.event.impl.AbstractClientEvent;
import org.infinispan.hotrod.exceptions.TransportException;
import org.infinispan.hotrod.impl.counter.HotRodCounterEvent;
import org.infinispan.hotrod.impl.logging.Log;
import org.infinispan.hotrod.impl.logging.LogFactory;
import org.infinispan.hotrod.impl.operations.AddClientListenerOperation;
import org.infinispan.hotrod.impl.operations.HotRodOperation;
import org.infinispan.hotrod.impl.operations.OperationContext;
import org.infinispan.hotrod.impl.protocol.Codec;
import org.infinispan.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.hotrod.impl.transport.netty.ChannelPoolCloseEvent;
import org.infinispan.hotrod.impl.transport.netty.HintedReplayingDecoder;

public class HeaderDecoder
extends HintedReplayingDecoder<State> {
    private static final Log log = LogFactory.getLog(HeaderDecoder.class);
    public static final String NAME = "header-decoder";
    private final OperationContext operationContext;
    private final ConcurrentMap<Long, HotRodOperation<?>> incomplete = new ConcurrentHashMap();
    private final List<byte[]> listeners = new ArrayList<byte[]>();
    private volatile boolean closing;
    HotRodOperation<?> operation;
    private short status;
    private short receivedOpCode;

    public HeaderDecoder(OperationContext operationContext) {
        super(State.READ_MESSAGE_ID);
        this.operationContext = operationContext;
    }

    public boolean isSharable() {
        return false;
    }

    public void registerOperation(Channel channel, HotRodOperation<?> operation) {
        if (log.isTraceEnabled()) {
            log.tracef("Registering operation %s(%08X) with id %d on %s", new Object[]{operation, System.identityHashCode(operation), operation.header().messageId(), channel});
        }
        if (this.closing) {
            throw Log.HOTROD.noMoreOperationsAllowed();
        }
        HotRodOperation<?> prev = this.incomplete.put(operation.header().messageId(), operation);
        assert (prev == null) : "Already registered: " + String.valueOf(prev) + ", new: " + String.valueOf(operation);
        operation.scheduleTimeout(channel);
    }

    public void tryCompleteExceptionally(long messageId, Throwable t) {
        HotRodOperation<?> op = this.getAndRemove(messageId);
        if (op != null) {
            op.completeExceptionally(t);
        } else {
            log.errorf(t, "Not found operation %d to complete with exception", messageId);
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            Codec codec = this.operationContext.getCodec();
            switch (((State)((Object)this.state())).ordinal()) {
                case 0: {
                    long messageId = codec.readMessageId(in);
                    this.receivedOpCode = codec.readOpCode(in);
                    switch (this.receivedOpCode) {
                        case 96: 
                        case 97: 
                        case 98: 
                        case 99: {
                            if (codec.allowOperationsAndEvents()) {
                                this.operation = messageId == 0L ? null : (HotRodOperation)this.incomplete.get(messageId);
                            } else if (this.incomplete.size() == 1) {
                                this.operation = (HotRodOperation)this.incomplete.values().iterator().next();
                                messageId = this.operation.header().messageId();
                            } else {
                                if (this.incomplete.size() > 1) {
                                    throw new IllegalStateException("Too many incomplete operations: " + String.valueOf(this.incomplete));
                                }
                                this.operation = null;
                                messageId = 0L;
                            }
                            if (this.operation != null && !(this.operation instanceof AddClientListenerOperation)) {
                                throw Log.HOTROD.operationIsNotAddClientListener(messageId, this.operation.toString());
                            }
                            if (log.isTraceEnabled()) {
                                log.tracef("Received event for request %d", messageId, this.operation);
                            }
                            this.checkpoint(State.READ_CACHE_EVENT);
                            return;
                        }
                        case 102: {
                            this.checkpoint(State.READ_COUNTER_EVENT);
                            return;
                        }
                    }
                    if (messageId == 0L) {
                        codec.readHeader(in, this.receivedOpCode, null, this.operationContext.getChannelFactory(), ctx.channel().remoteAddress());
                        throw new IllegalStateException("Should be never reached");
                    }
                    this.loadCurrent(messageId);
                    if (log.isTraceEnabled()) {
                        log.tracef("Received response for request %d, %s", messageId, this.operation);
                    }
                    this.checkpoint(State.READ_HEADER);
                }
                case 1: {
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding header for message %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    this.status = codec.readHeader(in, this.receivedOpCode, this.operation.header(), this.operationContext.getChannelFactory(), ctx.channel().remoteAddress());
                    this.checkpoint(State.READ_PAYLOAD);
                }
                case 2: {
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding payload for message %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    this.operation.acceptResponse(in, this.status, this);
                    this.checkpoint(State.READ_MESSAGE_ID);
                    break;
                }
                case 3: {
                    AbstractClientEvent cacheEvent;
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding cache event %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    try {
                        cacheEvent = codec.readCacheEvent(in, this.operationContext.getListenerNotifier()::getCacheDataFormat, this.receivedOpCode, this.operationContext.getConfiguration().getClassAllowList(), ctx.channel().remoteAddress());
                    }
                    catch (Signal signal) {
                        throw signal;
                    }
                    catch (Throwable t) {
                        log.unableToReadEventFromServer(t, ctx.channel().remoteAddress());
                        throw t;
                    }
                    if (this.operation != null) {
                        ((AddClientListenerOperation)this.operation).postponeTimeout(ctx.channel());
                    }
                    this.invokeEvent(cacheEvent.getListenerId(), cacheEvent);
                    this.checkpoint(State.READ_MESSAGE_ID);
                    break;
                }
                case 4: {
                    HotRodCounterEvent counterEvent;
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding counter event %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    try {
                        counterEvent = codec.readCounterEvent(in);
                    }
                    catch (Signal signal) {
                        throw signal;
                    }
                    catch (Throwable t) {
                        Log.HOTROD.unableToReadEventFromServer(t, ctx.channel().remoteAddress());
                        throw t;
                    }
                    this.invokeEvent(counterEvent.getListenerId(), counterEvent);
                    this.checkpoint(State.READ_MESSAGE_ID);
                }
            }
        }
        catch (Signal signal) {
            throw signal;
        }
        catch (Exception e) {
            this.checkpoint(State.READ_MESSAGE_ID);
            throw e;
        }
    }

    private void invokeEvent(byte[] listenerId, Object cacheEvent) {
        try {
            this.operationContext.getListenerNotifier().invokeEvent(listenerId, cacheEvent);
        }
        catch (Exception e) {
            Log.HOTROD.unexpectedErrorConsumingEvent(cacheEvent, e);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (this.operation != null) {
            this.operation.exceptionCaught(ctx.channel(), cause);
        } else {
            TransportException transportException = log.errorFromUnknownOperation(ctx.channel(), cause, ctx.channel().remoteAddress());
            for (HotRodOperation op : this.incomplete.values()) {
                try {
                    op.exceptionCaught(ctx.channel(), transportException);
                }
                catch (Throwable t) {
                    Log.HOTROD.errorf(t, "Failed to complete %s", op);
                }
            }
            if (log.isTraceEnabled()) {
                log.tracef(cause, "Requesting %s close due to exception", ctx.channel());
            }
            ctx.close();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        for (HotRodOperation op : this.incomplete.values()) {
            try {
                op.channelInactive(ctx.channel());
            }
            catch (Throwable t) {
                Log.HOTROD.errorf(t, "Failed to complete %s", op);
            }
        }
        this.failoverClientListeners();
    }

    protected void resumeOperation(ByteBuf buf, long messageId, short opCode, short status) {
        try {
            switch (((State)((Object)this.state())).ordinal()) {
                case 0: {
                    this.receivedOpCode = opCode;
                    this.loadCurrent(messageId);
                    this.status = status;
                    this.checkpoint(State.READ_PAYLOAD);
                }
                case 2: {
                    if (log.isTraceEnabled()) {
                        log.tracef("Decoding payload for message %s", HotRodConstants.Names.of(this.receivedOpCode));
                    }
                    this.operation.acceptResponse(buf, status, this);
                    this.checkpoint(State.READ_MESSAGE_ID);
                    break;
                }
                default: {
                    throw new IllegalStateException("Delegate with state: " + String.valueOf(this.state()));
                }
            }
        }
        catch (Exception e) {
            this.state(State.READ_MESSAGE_ID);
            throw e;
        }
    }

    @Override
    protected boolean isHandlingMessage() {
        return this.state() != State.READ_MESSAGE_ID;
    }

    public void loadCurrent(long messageId) {
        this.operation = this.getAndRemove(messageId);
        if (this.operation == null) {
            throw Log.HOTROD.unknownMessageId(messageId);
        }
    }

    private HotRodOperation<?> getAndRemove(long messageId) {
        if (this.operation != null && this.operation.header().messageId() == messageId) {
            return this.operation;
        }
        return (HotRodOperation)this.incomplete.remove(messageId);
    }

    public void failoverClientListeners() {
        for (byte[] listenerId : this.listeners) {
            this.operationContext.getListenerNotifier().failoverClientListener(listenerId);
        }
    }

    public CompletableFuture<Void> allCompleteFuture() {
        return CompletableFuture.allOf(this.incomplete.values().toArray(new CompletableFuture[0]));
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof ChannelPoolCloseEvent) {
            this.closing = true;
            this.allCompleteFuture().whenComplete((nil, throwable) -> ctx.channel().close());
        } else if (evt instanceof IdleStateEvent && !this.incomplete.isEmpty()) {
            return;
        }
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void checkpoint() {
        super.checkpoint();
    }

    public int registeredOperations() {
        return this.incomplete.size();
    }

    public void addListener(byte[] listenerId) {
        if (log.isTraceEnabled()) {
            log.tracef("Decoder %08X adding listener %s", ((Object)((Object)this)).hashCode(), Util.printArray((byte[])listenerId));
        }
        this.listeners.add(listenerId);
    }

    public void removeListener(byte[] listenerId) {
        boolean removed = this.listeners.removeIf(id -> Arrays.equals(id, listenerId));
        if (log.isTraceEnabled()) {
            log.tracef("Decoder %08X removed? %s listener %s", ((Object)((Object)this)).hashCode(), Boolean.toString(removed), Util.printArray((byte[])listenerId));
        }
    }

    static enum State {
        READ_MESSAGE_ID,
        READ_HEADER,
        READ_PAYLOAD,
        READ_CACHE_EVENT,
        READ_COUNTER_EVENT;

    }
}

