package io.zeebe.broker.engine.impl;

import io.atomix.core.Atomix;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.PartitionListener;
import io.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.zeebe.engine.processing.message.command.SubscriptionCommandMessageHandler;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.protocol.impl.encoding.BrokerInfo;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/engine/impl/SubscriptionApiCommandMessageHandlerService.class */
public final class SubscriptionApiCommandMessageHandlerService extends Actor implements PartitionListener, DiskSpaceUsageListener {
    private static final String SUBSCRIPTION_TOPIC = "subscription";
    private static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final Int2ObjectHashMap<LogStreamRecordWriter> leaderPartitions = new Int2ObjectHashMap<>();
    private final Atomix atomix;
    private final String actorName;
    private SubscriptionCommandMessageHandler messageHandler;

    public SubscriptionApiCommandMessageHandlerService(BrokerInfo brokerInfo, Atomix atomix) {
        this.atomix = atomix;
        this.actorName = buildActorName(brokerInfo.getNodeId(), "SubscriptionApi");
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorStarting() {
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(actorControl);
        Consumer consumer = actorControl::call;
        Int2ObjectHashMap<LogStreamRecordWriter> int2ObjectHashMap = this.leaderPartitions;
        Objects.requireNonNull(int2ObjectHashMap);
        this.messageHandler = new SubscriptionCommandMessageHandler(consumer, int2ObjectHashMap::get);
        this.atomix.getCommunicationService().subscribe(SUBSCRIPTION_TOPIC, this.messageHandler);
    }

    @Override // io.zeebe.broker.PartitionListener
    public ActorFuture<Void> onBecomingFollower(int i, long j) {
        return this.actor.call(() -> {
            this.leaderPartitions.remove(i);
            return null;
        });
    }

    @Override // io.zeebe.broker.PartitionListener
    public ActorFuture<Void> onBecomingLeader(int i, long j, LogStream logStream) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.submit(() -> {
            logStream.newLogStreamRecordWriter().onComplete((logStreamRecordWriter, th) -> {
                if (th == null) {
                    this.leaderPartitions.put(i, logStreamRecordWriter);
                    completableActorFuture.complete((Object) null);
                } else {
                    LOG.error("Unexpected error on retrieving write buffer for partition {}", Integer.valueOf(i), th);
                    completableActorFuture.completeExceptionally(th);
                }
            });
        });
        return completableActorFuture;
    }

    @Override // io.zeebe.broker.PartitionListener
    public ActorFuture<Void> onBecomingInactive(int i, long j) {
        return this.actor.call(() -> {
            this.leaderPartitions.remove(i);
            return null;
        });
    }

    @Override // io.zeebe.broker.system.monitoring.DiskSpaceUsageListener
    public void onDiskSpaceNotAvailable() {
        this.actor.call(() -> {
            LOG.debug("Broker is out of disk space. All requests with topic {} will be rejected.", SUBSCRIPTION_TOPIC);
            this.atomix.getCommunicationService().unsubscribe(SUBSCRIPTION_TOPIC);
            this.atomix.getCommunicationService().subscribe(SUBSCRIPTION_TOPIC, obj -> {
                return CompletableFuture.completedFuture(null);
            });
        });
    }

    @Override // io.zeebe.broker.system.monitoring.DiskSpaceUsageListener
    public void onDiskSpaceAvailable() {
        this.actor.call(() -> {
            LOG.debug("Broker has disk space available again. All requests with topic {} will be accepted.", SUBSCRIPTION_TOPIC);
            this.atomix.getCommunicationService().unsubscribe(SUBSCRIPTION_TOPIC);
            this.atomix.getCommunicationService().subscribe(SUBSCRIPTION_TOPIC, this.messageHandler);
        });
    }
}
