package org.apache.qpid.server.virtualhost;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.class */
public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer {
    private static final Logger _logger = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class);

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer$DistributedTransactionVisitor.class */
    private static class DistributedTransactionVisitor implements DistributedTransactionHandler {
        private final VirtualHostImpl _virtualHost;
        private final MessageStore _store;
        private final EventLogger _eventLogger;
        private final MessageStoreLogSubject _logSubject;
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        private DistributedTransactionVisitor(VirtualHostImpl virtualHostImpl, MessageStore messageStore, EventLogger eventLogger, MessageStoreLogSubject messageStoreLogSubject, Map<Long, ServerMessage<?>> map, Map<Long, StoredMessage<?>> map2) {
            this._virtualHost = virtualHostImpl;
            this._store = messageStore;
            this._eventLogger = eventLogger;
            this._logSubject = messageStoreLogSubject;
            this._recoveredMessages = map;
            this._unusedMessages = map2;
        }

        @Override // org.apache.qpid.server.store.handler.DistributedTransactionHandler
        public boolean handle(Transaction.StoredXidRecord storedXidRecord, Transaction.EnqueueRecord[] enqueueRecordArr, Transaction.DequeueRecord[] dequeueRecordArr) {
            Xid xid = new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId());
            DtxRegistry dtxRegistry = this._virtualHost.getDtxRegistry();
            DtxBranch branch = dtxRegistry.getBranch(xid);
            if (branch == null) {
                branch = new DtxBranch(storedXidRecord, this._store, this._virtualHost);
                dtxRegistry.registerBranch(branch);
            }
            for (Transaction.EnqueueRecord enqueueRecord : enqueueRecordArr) {
                final AMQQueue attainedQueue = this._virtualHost.getAttainedQueue(enqueueRecord.getResource().getId());
                if (attainedQueue != null) {
                    long messageNumber = enqueueRecord.getMessage().getMessageNumber();
                    final ServerMessage<?> serverMessage = this._recoveredMessages.get(Long.valueOf(messageNumber));
                    this._unusedMessages.remove(Long.valueOf(messageNumber));
                    if (serverMessage != null) {
                        final MessageReference newReference = serverMessage.newReference();
                        final MessageEnqueueRecord[] messageEnqueueRecordArr = new MessageEnqueueRecord[1];
                        branch.enqueue(attainedQueue, serverMessage, new Action<MessageEnqueueRecord>() { // from class: org.apache.qpid.server.virtualhost.SynchronousMessageStoreRecoverer.DistributedTransactionVisitor.1
                            @Override // org.apache.qpid.server.util.Action, org.apache.qpid.server.util.BaseAction
                            public void performAction(MessageEnqueueRecord messageEnqueueRecord) {
                                messageEnqueueRecordArr[0] = messageEnqueueRecord;
                            }
                        });
                        branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.SynchronousMessageStoreRecoverer.DistributedTransactionVisitor.2
                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void postCommit() {
                                attainedQueue.enqueue(serverMessage, null, messageEnqueueRecordArr[0]);
                                newReference.release();
                            }

                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void onRollback() {
                                newReference.release();
                            }
                        });
                    } else {
                        this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber)));
                    }
                } else {
                    this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), enqueueRecord.getResource().getId().toString()));
                }
            }
            for (Transaction.DequeueRecord dequeueRecord : dequeueRecordArr) {
                AMQQueue attainedQueue2 = this._virtualHost.getAttainedQueue(dequeueRecord.getEnqueueRecord().getQueueId());
                if (attainedQueue2 != null) {
                    long messageNumber2 = dequeueRecord.getEnqueueRecord().getMessageNumber();
                    ServerMessage<?> serverMessage2 = this._recoveredMessages.get(Long.valueOf(messageNumber2));
                    this._unusedMessages.remove(Long.valueOf(messageNumber2));
                    if (serverMessage2 != null) {
                        final QueueEntry messageOnTheQueue = attainedQueue2.getMessageOnTheQueue(messageNumber2);
                        messageOnTheQueue.acquire();
                        branch.dequeue(messageOnTheQueue.getEnqueueRecord());
                        branch.addPostTransactionAction(new ServerTransaction.Action() { // from class: org.apache.qpid.server.virtualhost.SynchronousMessageStoreRecoverer.DistributedTransactionVisitor.3
                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void postCommit() {
                                messageOnTheQueue.delete();
                            }

                            @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                            public void onRollback() {
                                messageOnTheQueue.release();
                            }
                        });
                    } else {
                        this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidAsString(xid).toString(), Long.toString(messageNumber2)));
                    }
                } else {
                    this._eventLogger.message(this._logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidAsString(xid).toString(), dequeueRecord.getEnqueueRecord().getQueueId().toString()));
                }
            }
            branch.setState(DtxBranch.State.PREPARED);
            branch.prePrepareTransaction();
            return true;
        }

        private StringBuilder xidAsString(Xid xid) {
            return new StringBuilder("(").append(xid.getFormat()).append(',').append(Functions.str(xid.getGlobalId())).append(',').append(Functions.str(xid.getBranchId())).append(')');
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer$MessageInstanceVisitor.class */
    private static class MessageInstanceVisitor implements MessageInstanceHandler {
        private final VirtualHostImpl _virtualHost;
        private final MessageStore _store;
        private final Map<String, Integer> _queueRecoveries;
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        private MessageInstanceVisitor(VirtualHostImpl virtualHostImpl, MessageStore messageStore, Map<String, Integer> map, Map<Long, ServerMessage<?>> map2, Map<Long, StoredMessage<?>> map3) {
            this._virtualHost = virtualHostImpl;
            this._store = messageStore;
            this._queueRecoveries = map;
            this._recoveredMessages = map2;
            this._unusedMessages = map3;
        }

        @Override // org.apache.qpid.server.store.handler.MessageInstanceHandler
        public boolean handle(MessageEnqueueRecord messageEnqueueRecord) {
            UUID queueId = messageEnqueueRecord.getQueueId();
            long messageNumber = messageEnqueueRecord.getMessageNumber();
            AMQQueue attainedQueue = this._virtualHost.getAttainedQueue(queueId);
            if (attainedQueue == null) {
                SynchronousMessageStoreRecoverer._logger.warn("Message id " + messageNumber + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
                Transaction newTransaction = this._store.newTransaction();
                newTransaction.dequeueMessage(messageEnqueueRecord);
                newTransaction.commitTranAsync((Void) null);
                return true;
            }
            String name = attainedQueue.getName();
            ServerMessage<?> serverMessage = this._recoveredMessages.get(Long.valueOf(messageNumber));
            this._unusedMessages.remove(Long.valueOf(messageNumber));
            if (serverMessage == null) {
                SynchronousMessageStoreRecoverer._logger.warn("Message id " + messageNumber + " referenced in log as enqueued in queue " + name + " is unknown, entry will be discarded");
                Transaction newTransaction2 = this._store.newTransaction();
                newTransaction2.dequeueMessage(messageEnqueueRecord);
                newTransaction2.commitTranAsync((Void) null);
                return true;
            }
            if (SynchronousMessageStoreRecoverer._logger.isDebugEnabled()) {
                SynchronousMessageStoreRecoverer._logger.debug("On recovery, delivering " + serverMessage.getMessageNumber() + " to " + name);
            }
            Integer num = this._queueRecoveries.get(name);
            if (num == null) {
                num = 0;
            }
            attainedQueue.recover(serverMessage, messageEnqueueRecord);
            this._queueRecoveries.put(name, Integer.valueOf(num.intValue() + 1));
            return true;
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer$MessageVisitor.class */
    private static class MessageVisitor implements MessageHandler {
        private final Map<Long, ServerMessage<?>> _recoveredMessages;
        private final Map<Long, StoredMessage<?>> _unusedMessages;

        public MessageVisitor(Map<Long, ServerMessage<?>> map, Map<Long, StoredMessage<?>> map2) {
            this._recoveredMessages = map;
            this._unusedMessages = map2;
        }

        /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.qpid.server.store.StorableMessageMetaData] */
        @Override // org.apache.qpid.server.store.handler.MessageHandler
        public boolean handle(StoredMessage<?> storedMessage) {
            this._recoveredMessages.put(Long.valueOf(storedMessage.getMessageNumber()), storedMessage.getMetaData().getType().createMessage(storedMessage));
            this._unusedMessages.put(Long.valueOf(storedMessage.getMessageNumber()), storedMessage);
            return true;
        }
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public ListenableFuture<Void> recover(VirtualHostImpl virtualHostImpl) {
        EventLogger eventLogger = virtualHostImpl.getEventLogger();
        MessageStore messageStore = virtualHostImpl.getMessageStore();
        MessageStore.MessageStoreReader newMessageStoreReader = messageStore.newMessageStoreReader();
        MessageStoreLogSubject messageStoreLogSubject = new MessageStoreLogSubject(virtualHostImpl.getName(), messageStore.getClass().getSimpleName());
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        eventLogger.message(messageStoreLogSubject, MessageStoreMessages.RECOVERY_START());
        newMessageStoreReader.visitMessages(new MessageVisitor(hashMap, hashMap2));
        eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERY_START(null, false));
        newMessageStoreReader.visitMessageInstances(new MessageInstanceVisitor(virtualHostImpl, messageStore, treeMap, hashMap, hashMap2));
        for (Map.Entry entry : treeMap.entrySet()) {
            eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERED((Number) entry.getValue(), (String) entry.getKey()));
            eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERY_COMPLETE((String) entry.getKey(), true));
            virtualHostImpl.getAttainedQueue((String) entry.getKey()).completeRecovery();
        }
        for (AMQQueue aMQQueue : virtualHostImpl.getQueues()) {
            if (!treeMap.containsKey(aMQQueue.getName())) {
                aMQQueue.completeRecovery();
            }
        }
        newMessageStoreReader.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHostImpl, messageStore, eventLogger, messageStoreLogSubject, hashMap, hashMap2));
        for (StoredMessage storedMessage : hashMap2.values()) {
            _logger.warn("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing....");
            storedMessage.remove();
        }
        eventLogger.message(messageStoreLogSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
        eventLogger.message(messageStoreLogSubject, MessageStoreMessages.RECOVERED(Integer.valueOf(hashMap.size() - hashMap2.size())));
        eventLogger.message(messageStoreLogSubject, MessageStoreMessages.RECOVERY_COMPLETE());
        return Futures.immediateFuture((Object) null);
    }

    @Override // org.apache.qpid.server.virtualhost.MessageStoreRecoverer
    public void cancel() {
    }
}
