package org.eclipse.hono.commandrouter.impl;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.eclipse.hono.client.command.CommandAlreadyProcessedException;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandToBeReprocessedException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/commandrouter/impl/AbstractCommandProcessingQueue.class */
public abstract class AbstractCommandProcessingQueue<T extends CommandContext, K> implements CommandProcessingQueue<T> {
    private final Logger LOG = LoggerFactory.getLogger(getClass());
    private final Map<K, AbstractCommandProcessingQueue<T, K>.CommandQueue> commandQueues = new HashMap();
    private final Vertx vertx;

    /* loaded from: input_file:org/eclipse/hono/commandrouter/impl/AbstractCommandProcessingQueue$CommandQueue.class */
    public final class CommandQueue {
        private static final String KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE = "commandSendActionSupplierAndResultPromise";
        private final Deque<T> queue = new ArrayDeque();
        private final K queueKey;

        CommandQueue(K k) {
            this.queueKey = k;
        }

        public void add(T t) {
            Objects.requireNonNull(t);
            this.queue.addLast(t);
        }

        public boolean remove(T t) {
            Objects.requireNonNull(t);
            if (!this.queue.remove(t)) {
                return false;
            }
            sendNextCommandInQueueIfPossible();
            return true;
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        public int getSize() {
            return this.queue.size();
        }

        public void markAsUnusedAndClear() {
            ArrayList arrayList = new ArrayList(this.queue);
            this.queue.clear();
            arrayList.forEach(commandContext -> {
                Pair<Supplier<Future<Void>>, Promise<Void>> sendActionSupplierAndResultPromise = getSendActionSupplierAndResultPromise(commandContext);
                if (sendActionSupplierAndResultPromise != null) {
                    String commandSourceForLog = AbstractCommandProcessingQueue.this.getCommandSourceForLog(this.queueKey);
                    AbstractCommandProcessingQueue.this.LOG.info("command won't be sent - commands from {} aren't handled by this consumer anymore [{}]", commandSourceForLog, commandContext.getCommand());
                    TracingHelper.logError(commandContext.getTracingSpan(), "command won't be sent - commands from %s aren't handled by this consumer anymore".formatted(commandSourceForLog));
                    CommandToBeReprocessedException commandToBeReprocessedException = new CommandToBeReprocessedException();
                    commandContext.release(commandToBeReprocessedException);
                    ((Promise) sendActionSupplierAndResultPromise.two()).fail(commandToBeReprocessedException);
                }
            });
        }

        public Future<Void> applySendCommandAction(T t, Supplier<Future<Void>> supplier) {
            Throwable commandToBeReprocessedException;
            Objects.requireNonNull(t);
            Objects.requireNonNull(supplier);
            Promise<Void> promise = Promise.promise();
            if (t.equals(this.queue.peek())) {
                sendGivenCommandAndNextInQueueIfPossible(this.queue.remove(), supplier, promise, true);
            } else if (this.queue.contains(t)) {
                T peek = this.queue.peek();
                AbstractCommandProcessingQueue.this.LOG.debug("sending of command gets delayed; waiting for processing of previous command [queue size: {}; delayed {}; waiting for {}]", new Object[]{Integer.valueOf(this.queue.size()), t.getCommand(), peek.getCommand()});
                t.getTracingSpan().log("waiting for an earlier command to be processed first [queue size: %d; waiting for %s]".formatted(Integer.valueOf(this.queue.size()), peek.getCommand()));
                t.getTracingSpan().setTag("processing_delayed", true);
                t.put(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE, Pair.of(supplier, promise));
            } else {
                if (t.isCompleted()) {
                    AbstractCommandProcessingQueue.this.LOG.debug("command won't be sent - already processed and not in queue anymore [{}]", t.getCommand());
                    commandToBeReprocessedException = new CommandAlreadyProcessedException();
                } else {
                    AbstractCommandProcessingQueue.this.LOG.info("command won't be sent - not in queue [{}]", t.getCommand());
                    TracingHelper.logError(t.getTracingSpan(), "command won't be sent - not in queue");
                    commandToBeReprocessedException = new CommandToBeReprocessedException();
                    t.release(commandToBeReprocessedException);
                }
                promise.fail(commandToBeReprocessedException);
            }
            return promise.future();
        }

        private void sendGivenCommandAndNextInQueueIfPossible(T t, Supplier<Future<Void>> supplier, Promise<Void> promise, boolean z) {
            AbstractCommandProcessingQueue.this.LOG.trace("apply send action on [{}]", t.getCommand());
            Future<Void> future = supplier.get();
            future.onComplete(promise);
            if (this.queue.isEmpty()) {
                return;
            }
            if (future.isComplete() && z) {
                AbstractCommandProcessingQueue.this.vertx.getOrCreateContext().runOnContext(r3 -> {
                    sendNextCommandInQueueIfPossible();
                });
            } else {
                sendNextCommandInQueueIfPossible();
            }
        }

        private void sendNextCommandInQueueIfPossible() {
            Optional.ofNullable(this.queue.peek()).map(this::getSendActionSupplierAndResultPromise).ifPresent(pair -> {
                sendGivenCommandAndNextInQueueIfPossible(this.queue.remove(), (Supplier) pair.one(), (Promise) pair.two(), false);
            });
        }

        private Pair<Supplier<Future<Void>>, Promise<Void>> getSendActionSupplierAndResultPromise(T t) {
            return (Pair) t.get(KEY_COMMAND_SEND_ACTION_SUPPLIER_AND_RESULT_PROMISE);
        }
    }

    public AbstractCommandProcessingQueue(Vertx vertx) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
    }

    @Override // org.eclipse.hono.commandrouter.impl.CommandProcessingQueue
    public final void add(T t) {
        Objects.requireNonNull(t);
        K queueKey = getQueueKey(t);
        this.commandQueues.computeIfAbsent(queueKey, obj -> {
            return new CommandQueue(queueKey);
        }).add(t);
    }

    protected abstract K getQueueKey(T t);

    protected abstract String getCommandSourceForLog(K k);

    @Override // org.eclipse.hono.commandrouter.impl.CommandProcessingQueue
    public final boolean remove(T t) {
        Objects.requireNonNull(t);
        return ((Boolean) Optional.ofNullable(this.commandQueues.get(getQueueKey(t))).map(commandQueue -> {
            return Boolean.valueOf(commandQueue.remove(t));
        }).orElse(false)).booleanValue();
    }

    @Override // org.eclipse.hono.commandrouter.impl.CommandProcessingQueue
    public final Future<Void> applySendCommandAction(T t, Supplier<Future<Void>> supplier) {
        K queueKey = getQueueKey(t);
        AbstractCommandProcessingQueue<T, K>.CommandQueue commandQueue = this.commandQueues.get(queueKey);
        if (commandQueue != null) {
            return commandQueue.applySendCommandAction(t, supplier);
        }
        String commandSourceForLog = getCommandSourceForLog(queueKey);
        this.LOG.info("command won't be sent - commands from {} aren't handled by this consumer anymore [{}]", commandSourceForLog, t.getCommand());
        TracingHelper.logError(t.getTracingSpan(), "command won't be sent - commands from %s aren't handled by this consumer anymore".formatted(commandSourceForLog));
        CommandToBeReprocessedException commandToBeReprocessedException = new CommandToBeReprocessedException();
        t.release(commandToBeReprocessedException);
        return Future.failedFuture(commandToBeReprocessedException);
    }

    @Override // org.eclipse.hono.commandrouter.impl.CommandProcessingQueue
    public final void clear() {
        removeCommandQueueEntries(obj -> {
            return true;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void removeCommandQueueEntries(Predicate<K> predicate) {
        Objects.requireNonNull(predicate);
        Iterator<Map.Entry<K, AbstractCommandProcessingQueue<T, K>.CommandQueue>> it = this.commandQueues.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<K, AbstractCommandProcessingQueue<T, K>.CommandQueue> next = it.next();
            if (predicate.test(next.getKey())) {
                if (next.getValue().isEmpty()) {
                    this.LOG.debug("commands from {} aren't handled here anymore; command queue is empty", getCommandSourceForLog(next.getKey()));
                } else {
                    this.LOG.info("commands from {} aren't handled here anymore but the corresponding command queue isn't empty! [queue size: {}]", getCommandSourceForLog(next.getKey()), Integer.valueOf(next.getValue().getSize()));
                    next.getValue().markAsUnusedAndClear();
                }
                it.remove();
            }
        }
    }
}
