package org.eclipse.hono.adapter.mqtt;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/PendingPubAcks.class */
public final class PendingPubAcks {
    private static final Logger LOG = LoggerFactory.getLogger(PendingPubAcks.class);
    private final Map<Integer, PendingPubAck> pendingAcks = new ConcurrentHashMap();
    private final Vertx vertx;

    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/PendingPubAcks$PendingPubAck.class */
    private class PendingPubAck {
        private final int msgId;
        private final Handler<Integer> onAckHandler;
        private final Handler<Void> onAckTimeoutHandler;
        private final Long timerId;

        PendingPubAck(int i, Handler<Integer> handler, Handler<Void> handler2, Long l) {
            this.msgId = i;
            this.onAckHandler = (Handler) Objects.requireNonNull(handler);
            this.onAckTimeoutHandler = (Handler) Objects.requireNonNull(handler2);
            this.timerId = l;
        }

        public void onPubAck() {
            PendingPubAcks.LOG.trace("acknowledgement received for message sent to device [packet-id: {}]", Integer.valueOf(this.msgId));
            if (this.timerId != null) {
                PendingPubAcks.this.vertx.cancelTimer(this.timerId.longValue());
            }
            this.onAckHandler.handle(Integer.valueOf(this.msgId));
        }

        public void onPubAckTimeout() {
            this.onAckTimeoutHandler.handle((Object) null);
        }
    }

    public PendingPubAcks(Vertx vertx) {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
    }

    public void handlePubAck(Integer num) {
        Objects.requireNonNull(num);
        Optional.ofNullable(this.pendingAcks.remove(num)).ifPresentOrElse((v0) -> {
            v0.onPubAck();
        }, () -> {
            LOG.debug("no active request found for received acknowledgement [packet-id: {}]", num);
        });
    }

    public void add(Integer num, Handler<Integer> handler, Handler<Void> handler2, long j) {
        Objects.requireNonNull(num);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(handler2);
        if (this.pendingAcks.put(num, new PendingPubAck(num.intValue(), handler, handler2, startTimerIfNeeded(num, j))) != null) {
            LOG.error("error registering ack handler; already waiting for ack of message id [{}]", num);
        }
    }

    private Long startTimerIfNeeded(Integer num, long j) {
        if (j < 1) {
            return null;
        }
        return Long.valueOf(this.vertx.setTimer(j, l -> {
            Optional.ofNullable(this.pendingAcks.remove(num)).ifPresent((v0) -> {
                v0.onPubAckTimeout();
            });
        }));
    }
}
