package io.zeebe.broker.system.management;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.exporter.stream.ExporterDirector;
import io.zeebe.broker.exporter.stream.ExporterPhase;
import io.zeebe.broker.system.partitions.ZeebePartition;
import io.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.zeebe.snapshots.broker.impl.FileBasedSnapshotMetadata;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/system/management/BrokerAdminServiceImpl.class */
public class BrokerAdminServiceImpl extends Actor implements BrokerAdminService {
    private static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final List<ZeebePartition> partitions;

    public BrokerAdminServiceImpl(List<ZeebePartition> list) {
        this.partitions = list;
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public void pauseStreamProcessing() {
        this.actor.call(this::pauseStreamProcessingOnAllPartitions);
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public void resumeStreamProcessing() {
        this.actor.call(this::resumeStreamProcessingOnAllPartitions);
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public void pauseExporting() {
        this.actor.call(this::pauseExportingOnAllPartitions);
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public void resumeExporting() {
        this.actor.call(this::resumeExportingOnAllPartitions);
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public void takeSnapshot() {
        this.actor.call(() -> {
            takeSnapshotOnAllPartitions(this.partitions);
        });
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public void prepareForUpgrade() {
        this.actor.call(this::prepareAllPartitionsForSafeUpgrade);
    }

    @Override // io.zeebe.broker.system.management.BrokerAdminService
    public Map<Integer, PartitionStatus> getPartitionStatus() {
        CompletableFuture completableFuture = new CompletableFuture();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.actor.call(() -> {
            CompletableFuture.allOf((CompletableFuture[]) ((List) this.partitions.stream().map(zeebePartition -> {
                return getPartitionStatus(zeebePartition).whenComplete((partitionStatus, th) -> {
                    if (th == null) {
                        concurrentHashMap.put(Integer.valueOf(zeebePartition.getPartitionId()), partitionStatus);
                    }
                });
            }).collect(Collectors.toList())).toArray(i -> {
                return new CompletableFuture[i];
            })).thenAccept(r5 -> {
                completableFuture.complete(concurrentHashMap);
            });
        });
        return (Map) completableFuture.join();
    }

    private CompletableFuture<PartitionStatus> getPartitionStatus(ZeebePartition zeebePartition) {
        CompletableFuture<PartitionStatus> completableFuture = new CompletableFuture<>();
        ActorFuture<Optional<StreamProcessor>> streamProcessor = zeebePartition.getStreamProcessor();
        ActorFuture<Optional<ExporterDirector>> exporterDirector = zeebePartition.getExporterDirector();
        this.actor.runOnCompletion(List.of(streamProcessor, exporterDirector), th -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            Optional optional = (Optional) streamProcessor.join();
            Optional optional2 = (Optional) exporterDirector.join();
            if (optional.isPresent() && optional2.isPresent()) {
                getLeaderPartitionStatus(zeebePartition, (StreamProcessor) optional.get(), (ExporterDirector) optional2.get(), completableFuture);
            } else {
                getFollowerPartitionStatus(zeebePartition, completableFuture);
            }
        });
        return completableFuture;
    }

    private void getFollowerPartitionStatus(ZeebePartition zeebePartition, CompletableFuture<PartitionStatus> completableFuture) {
        completableFuture.complete(PartitionStatus.ofFollower(getSnapshotId(zeebePartition).orElse(null)));
    }

    private void getLeaderPartitionStatus(ZeebePartition zeebePartition, StreamProcessor streamProcessor, ExporterDirector exporterDirector, CompletableFuture<PartitionStatus> completableFuture) {
        ActorFuture lastProcessedPositionAsync = streamProcessor.getLastProcessedPositionAsync();
        ActorFuture currentPhase = streamProcessor.getCurrentPhase();
        ActorFuture<ExporterPhase> phase = exporterDirector.getPhase();
        long lowestPosition = exporterDirector.getState().getLowestPosition();
        Optional<String> snapshotId = getSnapshotId(zeebePartition);
        Long l = (Long) snapshotId.flatMap(FileBasedSnapshotMetadata::ofFileName).map((v0) -> {
            return v0.getProcessedPosition();
        }).orElse(null);
        this.actor.runOnCompletion(List.of(lastProcessedPositionAsync, currentPhase, phase), th -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            completableFuture.complete(PartitionStatus.ofLeader((Long) lastProcessedPositionAsync.join(), (String) snapshotId.orElse(null), l, (StreamProcessor.Phase) currentPhase.join(), (ExporterPhase) phase.join(), lowestPosition));
        });
    }

    private Optional<String> getSnapshotId(ZeebePartition zeebePartition) {
        return zeebePartition.getSnapshotStore().getLatestSnapshot().map((v0) -> {
            return v0.getId();
        });
    }

    private void prepareAllPartitionsForSafeUpgrade() {
        LOG.info("Preparing for safe upgrade.");
        this.actor.runOnCompletion((List) Stream.of((Object[]) new List[]{pauseStreamProcessingOnAllPartitions(), pauseExportingOnAllPartitions()}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()), th -> {
            takeSnapshotOnAllPartitions(this.partitions);
        });
    }

    private List<ActorFuture<Void>> pauseStreamProcessingOnAllPartitions() {
        LOG.info("Pausing StreamProcessor on all partitions.");
        return (List) this.partitions.stream().map((v0) -> {
            return v0.pauseProcessing();
        }).collect(Collectors.toList());
    }

    private void resumeStreamProcessingOnAllPartitions() {
        LOG.info("Resuming paused StreamProcessor on all partitions.");
        this.partitions.forEach((v0) -> {
            v0.resumeProcessing();
        });
    }

    private void takeSnapshotOnAllPartitions(List<ZeebePartition> list) {
        LOG.info("Triggering Snapshots on all partitions.");
        list.forEach((v0) -> {
            v0.triggerSnapshot();
        });
    }

    private List<ActorFuture<Void>> pauseExportingOnAllPartitions() {
        LOG.info("Pausing exporting on all partitions.");
        return (List) this.partitions.stream().map((v0) -> {
            return v0.pauseExporting();
        }).collect(Collectors.toList());
    }

    private void resumeExportingOnAllPartitions() {
        LOG.info("Resuming exporting on all partitions.");
        this.partitions.forEach((v0) -> {
            v0.resumeExporting();
        });
    }
}
