package org.eclipse.hono.client.command;

import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.util.CommandConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/command/CommandRouterCommandConsumerFactory.class */
public class CommandRouterCommandConsumerFactory implements CommandConsumerFactory, ServiceClient {
    private static final Logger LOG = LoggerFactory.getLogger(CommandRouterCommandConsumerFactory.class);
    private static final AtomicInteger ADAPTER_INSTANCE_ID_COUNTER = new AtomicInteger();
    private final String adapterInstanceId;
    private final CommandRouterClient commandRouterClient;
    private final CommandHandlers commandHandlers = new CommandHandlers();
    private final List<InternalCommandConsumer> internalCommandConsumers = new ArrayList();
    private int maxTenantIdsPerRequest = 100;

    public CommandRouterCommandConsumerFactory(CommandRouterClient commandRouterClient, String str) {
        this.commandRouterClient = (CommandRouterClient) Objects.requireNonNull(commandRouterClient);
        Objects.requireNonNull(str);
        this.adapterInstanceId = CommandConstants.getNewAdapterInstanceId(str, ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement());
        if (commandRouterClient instanceof ConnectionLifecycle) {
            ((ConnectionLifecycle) commandRouterClient).addReconnectListener(obj -> {
                reenableCommandRouting();
            });
        }
    }

    void setMaxTenantIdsPerRequest(int i) {
        this.maxTenantIdsPerRequest = i;
    }

    private void reenableCommandRouting() {
        List list = (List) this.commandHandlers.getCommandHandlers().stream().map((v0) -> {
            return v0.getTenantId();
        }).distinct().collect(Collectors.toList());
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= list.size()) {
                return;
            }
            int min = i2 + Math.min(this.maxTenantIdsPerRequest, list.size() - i2);
            this.commandRouterClient.enableCommandRouting(list.subList(i2, min), null);
            i = min;
        }
    }

    public void registerInternalCommandConsumer(BiFunction<String, CommandHandlers, InternalCommandConsumer> biFunction) {
        InternalCommandConsumer apply = biFunction.apply(this.adapterInstanceId, this.commandHandlers);
        LOG.info("register internal command consumer {}", apply.getClass().getSimpleName());
        this.internalCommandConsumers.add(apply);
    }

    public Future<Void> start() {
        List list = (List) this.internalCommandConsumers.stream().map((v0) -> {
            return v0.start();
        }).collect(Collectors.toList());
        return list.isEmpty() ? Future.failedFuture("no command consumer registered") : CompositeFuture.all(list).mapEmpty();
    }

    public Future<Void> stop() {
        return CompositeFuture.all((List) this.internalCommandConsumers.stream().map((v0) -> {
            return v0.stop();
        }).collect(Collectors.toList())).mapEmpty();
    }

    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        this.internalCommandConsumers.forEach(internalCommandConsumer -> {
            internalCommandConsumer.registerReadinessChecks(healthCheckHandler);
        });
    }

    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
    }

    @Override // org.eclipse.hono.client.command.CommandConsumerFactory
    public final Future<CommandConsumer> createCommandConsumer(String str, String str2, Function<CommandContext, Future<Void>> function, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(function);
        return doCreateCommandConsumer(str, str2, null, function, duration, spanContext);
    }

    @Override // org.eclipse.hono.client.command.CommandConsumerFactory
    public final Future<CommandConsumer> createCommandConsumer(String str, String str2, String str3, Function<CommandContext, Future<Void>> function, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(function);
        return doCreateCommandConsumer(str, str2, str3, function, duration, spanContext);
    }

    private Future<CommandConsumer> doCreateCommandConsumer(String str, String str2, String str3, Function<CommandContext, Future<Void>> function, Duration duration, SpanContext spanContext) {
        Duration ofSeconds = (duration == null || duration.isNegative() || duration.getSeconds() > 9223372036L) ? Duration.ofSeconds(-1L) : duration;
        LOG.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", new Object[]{str, str2, str3});
        CommandHandlerWrapper commandHandlerWrapper = new CommandHandlerWrapper(str, str2, str3, function, Vertx.currentContext(), (ofSeconds.isNegative() || ofSeconds.toSeconds() > 60) ? null : spanContext);
        this.commandHandlers.putCommandHandler(commandHandlerWrapper);
        Instant now = Instant.now();
        return this.commandRouterClient.registerCommandConsumer(str, str2, this.adapterInstanceId, ofSeconds, spanContext).onFailure(th -> {
            LOG.info("error registering consumer with the command router service [tenant: {}, device: {}]", new Object[]{str, str2, th});
            this.commandHandlers.removeCommandHandler(str, str2);
        }).map(r11 -> {
            return new CommandConsumer() { // from class: org.eclipse.hono.client.command.CommandRouterCommandConsumerFactory.1
                @Override // org.eclipse.hono.client.command.CommandConsumer
                public Future<Void> close(SpanContext spanContext2) {
                    return CommandRouterCommandConsumerFactory.this.removeCommandConsumer(commandHandlerWrapper, ofSeconds, now, spanContext2);
                }
            };
        });
    }

    private Future<Void> removeCommandConsumer(CommandHandlerWrapper commandHandlerWrapper, Duration duration, Instant instant, SpanContext spanContext) {
        String tenantId = commandHandlerWrapper.getTenantId();
        String deviceId = commandHandlerWrapper.getDeviceId();
        LOG.trace("remove command consumer [tenant-id: {}, device-id: {}]", tenantId, deviceId);
        if (this.commandHandlers.removeCommandHandler(commandHandlerWrapper)) {
            return this.commandRouterClient.unregisterCommandConsumer(tenantId, deviceId, this.adapterInstanceId, spanContext).recover(th -> {
                if (ServiceInvocationException.extractStatusCode(th) != 412) {
                    LOG.info("error unregistering consumer with the command router service [tenant: {}, device: {}]", new Object[]{tenantId, deviceId, th});
                    return Future.failedFuture(th);
                }
                if (!duration.isNegative() && Instant.now().isAfter(instant.plus((TemporalAmount) duration))) {
                    LOG.trace("ignoring 412 error when unregistering consumer with the command router service; entry may have already expired [tenant: {}, device: {}]", tenantId, deviceId);
                    return Future.succeededFuture();
                }
                LOG.debug("consumer not unregistered - not matched or already removed [tenant: {}, device: {}]", tenantId, deviceId);
                return Future.failedFuture(new ClientErrorException(412, "no matching command consumer mapping found to be removed"));
            });
        }
        LOG.debug("command consumer not removed - handler already replaced or removed [tenant: {}, device: {}]", tenantId, deviceId);
        return Future.failedFuture(new ClientErrorException(412, "local command handler already replaced or removed"));
    }
}
