package com.sun.messaging.jmq.jmsserver.multibroker;

import com.sun.messaging.jmq.io.GPacket;
import com.sun.messaging.jmq.jmsserver.FaultInjection;
import com.sun.messaging.jmq.jmsserver.Globals;
import com.sun.messaging.jmq.jmsserver.core.Consumer;
import com.sun.messaging.jmq.jmsserver.core.Destination;
import com.sun.messaging.jmq.jmsserver.core.DestinationUID;
import com.sun.messaging.jmq.jmsserver.data.TransactionUID;
import com.sun.messaging.jmq.jmsserver.multibroker.raptor.ClusterGoodbyeInfo;
import com.sun.messaging.jmq.jmsserver.multibroker.raptor.ClusterMessageAckInfo;
import com.sun.messaging.jmq.jmsserver.multibroker.raptor.ProtocolGlobals;
import com.sun.messaging.jmq.jmsserver.resources.BrokerResources;
import com.sun.messaging.jmq.jmsserver.service.ConnectionUID;
import com.sun.messaging.jmq.util.log.Logger;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/* JADX WARN: Classes with same name are omitted:
  input_file:jmsra.rar:lib/install/applications/jmsra/imqbroker.jar:com/sun/messaging/jmq/jmsserver/multibroker/CallbackDispatcher.class
 */
/* loaded from: input_file:com/sun/messaging/jmq/jmsserver/multibroker/CallbackDispatcher.class */
public class CallbackDispatcher extends Thread {
    private static boolean DEBUG = false;
    private static final boolean DEBUG_CLUSTER_ALL = Globals.getConfig().getBooleanProperty("imq.cluster.debug.all");
    private static final boolean DEBUG_CLUSTER_MSG;
    private static final boolean DEBUG_CLUSTER_TXN;
    protected static final Logger logger;
    private MessageBusCallback cb;
    private LinkedList eventQ;
    private ExecutorService commitAckExecutor;
    private ExecutorService syncAckExecutor;
    private ExecutorService msgDataExecutor;
    private static FaultInjection fi;
    private boolean stopThread = false;
    private BrokerResources br = Globals.getBrokerResources();

    public CallbackDispatcher(MessageBusCallback messageBusCallback) {
        this.cb = null;
        this.eventQ = null;
        this.commitAckExecutor = null;
        this.syncAckExecutor = null;
        this.msgDataExecutor = null;
        this.cb = messageBusCallback;
        this.eventQ = new LinkedList();
        setName("MessageBusCallbackDispatcher");
        setDaemon(true);
        start();
        this.commitAckExecutor = Executors.newSingleThreadExecutor();
        this.syncAckExecutor = Executors.newSingleThreadExecutor();
        this.msgDataExecutor = Executors.newSingleThreadExecutor();
    }

    public void configSyncComplete() {
        ConfigSyncCompleteCallbackEvent configSyncCompleteCallbackEvent = new ConfigSyncCompleteCallbackEvent();
        synchronized (this.eventQ) {
            this.eventQ.add(configSyncCompleteCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void processGPacket(com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress, GPacket gPacket, Protocol protocol) {
        GPacketCallbackEvent gPacketCallbackEvent = new GPacketCallbackEvent(brokerAddress, gPacket, protocol);
        synchronized (this.eventQ) {
            this.eventQ.add(gPacketCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void processMessageAckReply(final com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress, final GPacket gPacket, final Protocol protocol) {
        short type = gPacket.getType();
        if (type != 4) {
            throw new RuntimeException("Internal Error: Unexpected packet type " + ((int) type) + " passed to CallbackDispatcher.processMessageAckReply()");
        }
        if (processCommitAck(brokerAddress, gPacket)) {
            return;
        }
        if (DEBUG_CLUSTER_MSG || DEBUG_CLUSTER_TXN) {
            Logger logger2 = logger;
            Logger logger3 = logger;
            logger2.log(8, "processMessageAckReply: Received " + ProtocolGlobals.getPacketTypeDisplayString(type) + " from " + brokerAddress + ": " + ClusterMessageAckInfo.toString(gPacket));
        }
        try {
            this.syncAckExecutor.execute(new Runnable() { // from class: com.sun.messaging.jmq.jmsserver.multibroker.CallbackDispatcher.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        protocol.handleGPacket(CallbackDispatcher.this.cb, brokerAddress, gPacket);
                    } catch (Throwable th) {
                        Logger logger4 = CallbackDispatcher.logger;
                        Logger logger5 = CallbackDispatcher.logger;
                        logger4.logStack(16, "Exception in processing " + ClusterMessageAckInfo.toString(gPacket) + " from " + brokerAddress, th);
                    }
                }
            });
        } catch (Throwable th) {
            if (this.stopThread) {
                Logger logger4 = logger;
                Logger logger5 = logger;
                logger4.log(4, "Cluster shutdown, ignore event " + ClusterMessageAckInfo.toString(gPacket) + " from " + brokerAddress);
            } else {
                Logger logger6 = logger;
                Logger logger7 = logger;
                logger6.logStack(16, "Exception in submitting for processing " + ClusterMessageAckInfo.toString(gPacket) + " from " + brokerAddress, th);
            }
        }
    }

    private boolean processCommitAck(final com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress, final GPacket gPacket) {
        Integer ackAckType;
        Long ackAckTransactionID;
        if (!ClusterMessageAckInfo.isAckAckAsync(gPacket) || (ackAckType = ClusterMessageAckInfo.getAckAckType(gPacket)) == null || ackAckType.intValue() != 3 || (ackAckTransactionID = ClusterMessageAckInfo.getAckAckTransactionID(gPacket)) == null) {
            return false;
        }
        if (ClusterMessageAckInfo.getAckAckStatus(gPacket) != 200) {
            Logger logger2 = logger;
            Logger logger3 = logger;
            BrokerResources brokerResources = this.br;
            BrokerResources brokerResources2 = this.br;
            logger2.log(16, brokerResources.getKString(BrokerResources.W_CLUSTER_MSG_ACK_FAILED_FROM_HOME, brokerAddress, ClusterMessageAckInfo.toString(gPacket)));
            return true;
        }
        if (DEBUG_CLUSTER_TXN || DEBUG) {
            Logger logger4 = logger;
            Logger logger5 = logger;
            logger4.log(8, "processCommitAck: Received " + ProtocolGlobals.getPacketTypeDisplayString(gPacket.getType()) + " from " + brokerAddress + ": " + ClusterMessageAckInfo.toString(gPacket));
        }
        if (fi.FAULT_INJECTION) {
            ClusterMessageAckInfo.CHECKFAULT(new HashMap(), 3, ackAckTransactionID, FaultInjection.MSG_REMOTE_ACK_P, "3");
        }
        try {
            final TransactionUID transactionUID = new TransactionUID(ackAckTransactionID.longValue());
            this.commitAckExecutor.execute(new Runnable() { // from class: com.sun.messaging.jmq.jmsserver.multibroker.CallbackDispatcher.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Globals.getTransactionList().completeClusterTransactionBrokerState(transactionUID, 6, brokerAddress, true);
                    } catch (Throwable th) {
                        Logger logger6 = CallbackDispatcher.logger;
                        Logger logger7 = CallbackDispatcher.logger;
                        logger6.log(16, "Unable to update cluster transaction COMMIT completion state for " + brokerAddress + ": " + ClusterMessageAckInfo.toString(gPacket), th);
                    }
                }
            });
            return true;
        } catch (Throwable th) {
            if (this.stopThread) {
                Logger logger6 = logger;
                Logger logger7 = logger;
                logger6.log(4, "Cluster shutdown, ignore event " + ClusterMessageAckInfo.toString(gPacket) + " from " + brokerAddress);
                return true;
            }
            Logger logger8 = logger;
            Logger logger9 = logger;
            logger8.logStack(16, "Exception in submitting for processing " + ClusterMessageAckInfo.toString(gPacket) + " from " + brokerAddress, th);
            return true;
        }
    }

    public void processMessageData(final com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress, final GPacket gPacket, final Protocol protocol) {
        short type = gPacket.getType();
        if (type != 1 && type != 6 && type != 10 && type != 36 && type != 8 && type != 22) {
            throw new RuntimeException("Internal Error: Unexpected packet type " + ((int) type) + " passed to CallbackDispatcher.processMessageData()");
        }
        if (DEBUG_CLUSTER_MSG || DEBUG_CLUSTER_TXN) {
            Logger logger2 = logger;
            Logger logger3 = logger;
            logger2.log(8, "processMessageData: Received " + ProtocolGlobals.getPacketTypeDisplayString(type) + " from " + brokerAddress);
        }
        try {
            this.msgDataExecutor.execute(new Runnable() { // from class: com.sun.messaging.jmq.jmsserver.multibroker.CallbackDispatcher.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        protocol.handleGPacket(CallbackDispatcher.this.cb, brokerAddress, gPacket);
                    } catch (Throwable th) {
                        Logger logger4 = CallbackDispatcher.logger;
                        Logger logger5 = CallbackDispatcher.logger;
                        logger4.logStack(16, "Exception in processing " + ProtocolGlobals.getPacketTypeDisplayString(gPacket.getType()) + " from " + brokerAddress, th);
                    }
                }
            });
        } catch (Throwable th) {
            if (this.stopThread) {
                Logger logger4 = logger;
                Logger logger5 = logger;
                logger4.log(4, "Cluster shutdown, ignore packet " + ProtocolGlobals.getPacketTypeDisplayString(type) + " from " + brokerAddress);
            } else {
                Logger logger6 = logger;
                Logger logger7 = logger;
                logger6.logStack(16, "Exception in submitting for processing " + ProtocolGlobals.getPacketTypeDisplayString(type) + " from " + brokerAddress, th);
            }
        }
    }

    public void processGoodbye(com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress, ClusterGoodbyeInfo clusterGoodbyeInfo) {
        synchronized (this.eventQ) {
            Iterator it = this.eventQ.iterator();
            while (it.hasNext()) {
                CallbackEvent callbackEvent = (CallbackEvent) it.next();
                if (callbackEvent instanceof GPacketCallbackEvent) {
                    GPacketCallbackEvent gPacketCallbackEvent = (GPacketCallbackEvent) callbackEvent;
                    if (gPacketCallbackEvent.getSender().equals(brokerAddress) && gPacketCallbackEvent.getEventType() == 3) {
                        Logger logger2 = logger;
                        Logger logger3 = logger;
                        logger2.log(16, "Discard unprocessed G_MESSAGE_ACK  because received GOODBYE from " + brokerAddress);
                        it.remove();
                    }
                }
            }
        }
    }

    public void processGoodbyeReply(com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress) {
        synchronized (this.eventQ) {
            Iterator it = this.eventQ.iterator();
            while (it.hasNext()) {
                CallbackEvent callbackEvent = (CallbackEvent) it.next();
                if (callbackEvent instanceof GPacketCallbackEvent) {
                    GPacketCallbackEvent gPacketCallbackEvent = (GPacketCallbackEvent) callbackEvent;
                    if (gPacketCallbackEvent.getSender().equals(brokerAddress) && gPacketCallbackEvent.getEventType() == 4) {
                        if (DEBUG) {
                            Logger logger2 = logger;
                            Logger logger3 = logger;
                            logger2.log(8, "Processed G_MESSAGE_ACK_REPLY from " + brokerAddress);
                        }
                        gPacketCallbackEvent.dispatch(this.cb);
                        it.remove();
                    }
                }
            }
        }
    }

    public void interestCreated(Consumer consumer) {
        InterestCreatedCallbackEvent interestCreatedCallbackEvent = new InterestCreatedCallbackEvent(consumer);
        synchronized (this.eventQ) {
            this.eventQ.add(interestCreatedCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void interestRemoved(Consumer consumer, Set set, boolean z) {
        InterestRemovedCallbackEvent interestRemovedCallbackEvent = new InterestRemovedCallbackEvent(consumer, set, z);
        synchronized (this.eventQ) {
            this.eventQ.add(interestRemovedCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void activeStateChanged(com.sun.messaging.jmq.jmsserver.core.ConsumerUID consumerUID) {
        PrimaryInterestChangedCallbackEvent primaryInterestChangedCallbackEvent = new PrimaryInterestChangedCallbackEvent(consumerUID);
        synchronized (this.eventQ) {
            this.eventQ.add(primaryInterestChangedCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void activeStateChanged(Consumer consumer) {
        PrimaryInterestChangedCallbackEvent primaryInterestChangedCallbackEvent = new PrimaryInterestChangedCallbackEvent(consumer);
        synchronized (this.eventQ) {
            this.eventQ.add(primaryInterestChangedCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void clientDown(ConnectionUID connectionUID) {
        ClientDownCallbackEvent clientDownCallbackEvent = new ClientDownCallbackEvent(connectionUID);
        synchronized (this.eventQ) {
            this.eventQ.add(clientDownCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void brokerDown(com.sun.messaging.jmq.jmsserver.core.BrokerAddress brokerAddress) {
        synchronized (this.eventQ) {
            Iterator it = this.eventQ.iterator();
            while (it.hasNext()) {
                CallbackEvent callbackEvent = (CallbackEvent) it.next();
                if (callbackEvent instanceof GPacketCallbackEvent) {
                    GPacketCallbackEvent gPacketCallbackEvent = (GPacketCallbackEvent) callbackEvent;
                    if (gPacketCallbackEvent.getSender().equals(brokerAddress) && (gPacketCallbackEvent.getSender().getBrokerSessionUID() == null || brokerAddress.getBrokerSessionUID() == null || gPacketCallbackEvent.getSender().getBrokerSessionUID().equals(brokerAddress.getBrokerSessionUID()))) {
                        if (gPacketCallbackEvent.getEventType() == 4 || gPacketCallbackEvent.getEventType() == 5 || gPacketCallbackEvent.getEventType() == 7 || gPacketCallbackEvent.getEventType() == 21) {
                            gPacketCallbackEvent.dispatch(this.cb);
                        }
                        it.remove();
                    }
                }
            }
        }
        this.cb.brokerDown(brokerAddress);
    }

    public void notifyCreateDestination(Destination destination) {
        ClusterCreateDestinationCallbackEvent clusterCreateDestinationCallbackEvent = new ClusterCreateDestinationCallbackEvent(destination, new CallbackEventListener());
        synchronized (this.eventQ) {
            if (this.stopThread) {
                Logger logger2 = logger;
                Logger logger3 = logger;
                logger2.log(4, "Cluster shutdown, ignore create destination event on " + destination);
            } else {
                this.eventQ.add(clusterCreateDestinationCallbackEvent);
                this.eventQ.notify();
                clusterCreateDestinationCallbackEvent.getEventListener().waitEventProcessed();
            }
        }
    }

    public void notifyDestroyDestination(DestinationUID destinationUID) {
        ClusterDestroyDestinationCallbackEvent clusterDestroyDestinationCallbackEvent = new ClusterDestroyDestinationCallbackEvent(destinationUID, new CallbackEventListener());
        synchronized (this.eventQ) {
            if (this.stopThread) {
                Logger logger2 = logger;
                Logger logger3 = logger;
                logger2.log(4, "Cluster shutdown, ignore destroy destination event on " + destinationUID);
            } else {
                this.eventQ.add(clusterDestroyDestinationCallbackEvent);
                this.eventQ.notify();
                clusterDestroyDestinationCallbackEvent.getEventListener().waitEventProcessed();
            }
        }
    }

    public void notifyUpdateDestination(DestinationUID destinationUID, Map map) {
        ClusterUpdateDestinationCallbackEvent clusterUpdateDestinationCallbackEvent = new ClusterUpdateDestinationCallbackEvent(destinationUID, map, new CallbackEventListener());
        synchronized (this.eventQ) {
            if (this.stopThread) {
                Logger logger2 = logger;
                Logger logger3 = logger;
                logger2.log(4, "Cluster shutdown, ignore update destination event on " + destinationUID);
            } else {
                this.eventQ.add(clusterUpdateDestinationCallbackEvent);
                this.eventQ.notify();
                clusterUpdateDestinationCallbackEvent.getEventListener().waitEventProcessed();
            }
        }
    }

    public void goHAActive() {
        GoHAActiveCallbackEvent goHAActiveCallbackEvent = new GoHAActiveCallbackEvent();
        synchronized (this.eventQ) {
            this.eventQ.add(goHAActiveCallbackEvent);
            this.eventQ.notify();
        }
    }

    public void shutdown() {
        synchronized (this.eventQ) {
            this.stopThread = true;
            this.eventQ.notify();
        }
        this.msgDataExecutor.shutdown();
        this.commitAckExecutor.shutdown();
        this.syncAckExecutor.shutdown();
        try {
            join(30000L);
        } catch (InterruptedException e) {
        }
        try {
            if (!this.msgDataExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                Logger logger2 = logger;
                Logger logger3 = logger;
                logger2.log(8, "Force cluster msgDataExecutor thread shutdown");
                this.msgDataExecutor.shutdownNow();
                this.msgDataExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e2) {
            this.msgDataExecutor.shutdownNow();
        }
        try {
            if (!this.commitAckExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                Logger logger4 = logger;
                Logger logger5 = logger;
                logger4.log(8, "Force cluster commitAckExecutor thread shutdown");
                this.commitAckExecutor.shutdownNow();
                this.commitAckExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e3) {
            this.commitAckExecutor.shutdownNow();
        }
        try {
            if (!this.syncAckExecutor.awaitTermination(30L, TimeUnit.SECONDS)) {
                Logger logger6 = logger;
                Logger logger7 = logger;
                logger6.log(8, "Force cluster syncAckExecutor thread shutdown");
                this.syncAckExecutor.shutdownNow();
                this.syncAckExecutor.awaitTermination(10L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e4) {
            this.syncAckExecutor.shutdownNow();
        }
    }

    public Hashtable getDebugState() {
        Object[] array;
        Hashtable hashtable = new Hashtable();
        hashtable.put("stopThread", Boolean.valueOf(this.stopThread));
        synchronized (this.eventQ) {
            array = this.eventQ.toArray();
        }
        hashtable.put("eventQCount", Integer.valueOf(array.length));
        Vector vector = new Vector();
        for (Object obj : array) {
            vector.add(obj.toString());
        }
        hashtable.put("eventQ", vector);
        return hashtable;
    }

    /*  JADX ERROR: JadxRuntimeException in pass: BlockProcessor
        jadx.core.utils.exceptions.JadxRuntimeException: Unreachable block: B:36:0x0152
        	at jadx.core.dex.visitors.blocks.BlockProcessor.checkForUnreachableBlocks(BlockProcessor.java:88)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.processBlocksTree(BlockProcessor.java:52)
        	at jadx.core.dex.visitors.blocks.BlockProcessor.visit(BlockProcessor.java:44)
        */
    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        /*
            Method dump skipped, instructions count: 362
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.sun.messaging.jmq.jmsserver.multibroker.CallbackDispatcher.run():void");
    }

    static {
        DEBUG_CLUSTER_MSG = Globals.getConfig().getBooleanProperty("imq.cluster.debug.msg") || DEBUG || DEBUG_CLUSTER_ALL;
        DEBUG_CLUSTER_TXN = Globals.getConfig().getBooleanProperty("imq.cluster.debug.txn") || DEBUG || DEBUG_CLUSTER_ALL || DEBUG_CLUSTER_MSG;
        logger = Globals.getLogger();
        fi = FaultInjection.getInjection();
    }
}
