package org.eclipse.ditto.services.base;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.pubsub.DistributedPubSub;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.management.AkkaManagement;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.stream.ActorMaterializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;
import kamon.Kamon;
import kamon.prometheus.PrometheusReporter;
import kamon.system.SystemMetrics;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.services.base.config.LimitsConfigReader;
import org.eclipse.ditto.services.base.config.ServiceConfigReader;
import org.eclipse.ditto.services.utils.cluster.ClusterMemberAwareActor;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.services.utils.devops.DevOpsCommandsActor;
import org.eclipse.ditto.services.utils.devops.LogbackLoggingFacade;
import org.eclipse.ditto.services.utils.health.status.StatusSupplierActor;
import org.eclipse.ditto.services.utils.metrics.prometheus.PrometheusReporterRoute;
import org.eclipse.ditto.services.utils.persistence.mongo.suffixes.NamespaceSuffixCollectionNames;
import org.eclipse.ditto.signals.commands.messages.MessageCommandSizeValidator;
import org.eclipse.ditto.signals.commands.policies.PolicyCommandSizeValidator;
import org.eclipse.ditto.signals.commands.things.ThingCommandSizeValidator;
import org.slf4j.Logger;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/services/base/DittoService.class */
public abstract class DittoService<C extends ServiceConfigReader> {
    public static final String CLUSTER_NAME = "ditto-cluster";
    private final Logger logger;
    private final String serviceName;
    private final String rootActorName;
    private final C configReader;

    @Nullable
    private PrometheusReporter prometheusReporter;

    @Immutable
    /* loaded from: input_file:org/eclipse/ditto/services/base/DittoService$RootActorInformation.class */
    public static final class RootActorInformation {
        private final Props props;
        private final String name;

        private RootActorInformation(Props props, String str) {
            this.props = props;
            this.name = str;
        }

        public static RootActorInformation getInstance(Props props, String str) {
            ConditionChecker.checkNotNull(props, "root actor props");
            ConditionChecker.argumentNotEmpty(str, "root actor name");
            return new RootActorInformation(props, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DittoService(Logger logger, String str, String str2, Function<Config, C> function) {
        this.logger = (Logger) ConditionChecker.checkNotNull(logger, "logger");
        this.serviceName = (String) ConditionChecker.argumentNotEmpty(str, "service name");
        this.rootActorName = (String) ConditionChecker.argumentNotEmpty(str2, "root actor name");
        this.configReader = (C) ((Function) ConditionChecker.checkNotNull(function, "config reader creator")).apply(determineConfig());
    }

    public ActorSystem start() {
        return MainMethodExceptionHandler.getInstance(this.logger).call(this::doStart);
    }

    protected ActorSystem doStart() {
        logRuntimeParameters();
        configureMongoDbSuffixBuilder();
        startKamon();
        Config rawConfig = this.configReader.getRawConfig();
        ActorSystem createActorSystem = createActorSystem(rawConfig);
        initializeActorSystem(rawConfig, createActorSystem);
        startKamonPrometheusHttpEndpoint(createActorSystem);
        return createActorSystem;
    }

    private void logRuntimeParameters() {
        this.logger.info("Running with following runtime parameters: {}", ManagementFactory.getRuntimeMXBean().getInputArguments());
        this.logger.info("Available processors: {}", Integer.valueOf(Runtime.getRuntime().availableProcessors()));
    }

    private void configureMongoDbSuffixBuilder() {
        this.configReader.mongoCollectionNameSuffix().getSuffixBuilderConfig().ifPresent(NamespaceSuffixCollectionNames::setConfig);
    }

    private void startKamon() {
        Kamon.reconfigure(ConfigFactory.load("kamon"));
        if (this.configReader.metrics().isSystemMetricsEnabled()) {
            SystemMetrics.startCollecting();
        }
        if (this.configReader.metrics().isPrometheusEnabled()) {
            startPrometheusReporter();
        }
    }

    private void startPrometheusReporter() {
        try {
            this.prometheusReporter = new PrometheusReporter();
            Kamon.addReporter(this.prometheusReporter);
            this.logger.info("Successfully added Prometheus reporter to Kamon.");
        } catch (Throwable th) {
            this.logger.error("Error while adding Prometheus reporter to Kamon.", th);
        }
    }

    protected void initializeActorSystem(Config config, ActorSystem actorSystem) {
        AkkaManagement.get(actorSystem).start();
        ClusterBootstrap.get(actorSystem).start();
        startStatusSupplierActor(actorSystem, config);
        startDevOpsCommandsActor(actorSystem, config);
        startClusterMemberAwareActor(actorSystem, this.configReader);
        startServiceRootActors(actorSystem, this.configReader);
        CoordinatedShutdown.get(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind(), "log_shutdown_initiation", () -> {
            this.logger.info("Initiated coordinated shutdown - gracefully shutting down..");
            return CompletableFuture.completedFuture(Done.getInstance());
        });
        CoordinatedShutdown.get(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate(), "log_successful_graceful_shutdown", () -> {
            this.logger.info("Graceful shutdown completed.");
            return CompletableFuture.completedFuture(Done.getInstance());
        });
    }

    private void startKamonPrometheusHttpEndpoint(ActorSystem actorSystem) {
        if (!this.configReader.metrics().isPrometheusEnabled() || this.prometheusReporter == null) {
            return;
        }
        ActorMaterializer createActorMaterializer = createActorMaterializer(actorSystem);
        Http.get(actorSystem).bindAndHandle(PrometheusReporterRoute.buildPrometheusReporterRoute(this.prometheusReporter).flow(actorSystem, createActorMaterializer), ConnectHttp.toHost(this.configReader.metrics().getPrometheusHostname(), this.configReader.metrics().getPrometheusPort().intValue()), createActorMaterializer).thenAccept(serverBinding -> {
            CoordinatedShutdown.get(actorSystem).addTask(CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_prometheus_http_endpoint", () -> {
                this.logger.info("Gracefully shutting down Prometheus HTTP endpoint..");
                return serverBinding.terminate(Duration.ofSeconds(1L)).handle((httpTerminated, th) -> {
                    return Done.getInstance();
                });
            });
        }).exceptionally(th -> {
            this.logger.error("Kamon Prometheus HTTP endpoint could not be started: {}", th.getMessage(), th);
            this.logger.error("Terminating actorSystem!");
            actorSystem.terminate();
            return null;
        });
    }

    protected Config determineConfig() {
        return ConfigUtil.determineConfig(this.serviceName);
    }

    protected ActorSystem createActorSystem(Config config) {
        return ActorSystem.create(CLUSTER_NAME, config);
    }

    protected void startStatusSupplierActor(ActorSystem actorSystem, Config config) {
        startActor(actorSystem, StatusSupplierActor.props(this.rootActorName), StatusSupplierActor.ACTOR_NAME);
    }

    private void startActor(ActorSystem actorSystem, Props props, String str) {
        logStartingActor(str);
        actorSystem.actorOf(props, str);
    }

    private void logStartingActor(String str) {
        this.logger.info("Starting actor <{}>.", str);
    }

    protected void startDevOpsCommandsActor(ActorSystem actorSystem, Config config) {
        startActor(actorSystem, DevOpsCommandsActor.props(LogbackLoggingFacade.newInstance(), this.serviceName, ConfigUtil.instanceIdentifier()), DevOpsCommandsActor.ACTOR_NAME);
    }

    protected void startClusterMemberAwareActor(ActorSystem actorSystem, C c) {
        startActor(actorSystem, ClusterMemberAwareActor.props(this.serviceName, isMajorityCheckEnabled(c), getMajorityCheckDelay(c)), ClusterMemberAwareActor.ACTOR_NAME);
    }

    private boolean isMajorityCheckEnabled(ServiceConfigReader serviceConfigReader) {
        return serviceConfigReader.cluster().majorityCheckEnabled();
    }

    private Duration getMajorityCheckDelay(ServiceConfigReader serviceConfigReader) {
        return serviceConfigReader.cluster().majorityCheckDelay();
    }

    protected void startServiceRootActors(ActorSystem actorSystem, C c) {
        this.logger.info("Waiting for member to be up before proceeding with further initialisation.");
        Cluster.get(actorSystem).registerOnMemberUp(() -> {
            this.logger.info("Member successfully joined the cluster, instantiating remaining actors.");
            addDropwizardMetricRegistries(actorSystem, c);
            ActorRef distributedPubSubMediatorActor = getDistributedPubSubMediatorActor(actorSystem);
            ActorMaterializer createActorMaterializer = createActorMaterializer(actorSystem);
            injectSystemPropertiesLimits(c);
            startMainRootActor(actorSystem, getMainRootActorProps(c, distributedPubSubMediatorActor, createActorMaterializer));
            startAdditionalRootActors(actorSystem, getAdditionalRootActorsInformation(c, distributedPubSubMediatorActor, createActorMaterializer));
        });
    }

    protected void addDropwizardMetricRegistries(ActorSystem actorSystem, C c) {
    }

    protected void injectSystemPropertiesLimits(C c) {
        LimitsConfigReader limits = c.limits();
        System.setProperty(ThingCommandSizeValidator.DITTO_LIMITS_THINGS_MAX_SIZE_BYTES, Long.toString(limits.thingsMaxSize()));
        System.setProperty(PolicyCommandSizeValidator.DITTO_LIMITS_POLICIES_MAX_SIZE_BYTES, Long.toString(limits.policiesMaxSize()));
        System.setProperty(MessageCommandSizeValidator.DITTO_LIMITS_MESSAGES_MAX_SIZE_BYTES, Long.toString(limits.messagesMaxSize()));
    }

    private static ActorRef getDistributedPubSubMediatorActor(ActorSystem actorSystem) {
        return DistributedPubSub.get(actorSystem).mediator();
    }

    private static ActorMaterializer createActorMaterializer(ActorRefFactory actorRefFactory) {
        return ActorMaterializer.create(actorRefFactory);
    }

    protected abstract Props getMainRootActorProps(C c, ActorRef actorRef, ActorMaterializer actorMaterializer);

    protected void startMainRootActor(ActorSystem actorSystem, Props props) {
        startActor(actorSystem, props, this.rootActorName);
    }

    protected Collection<RootActorInformation> getAdditionalRootActorsInformation(C c, ActorRef actorRef, ActorMaterializer actorMaterializer) {
        return Collections.emptyList();
    }

    protected void startAdditionalRootActors(ActorSystem actorSystem, Iterable<RootActorInformation> iterable) {
        for (RootActorInformation rootActorInformation : iterable) {
            startActor(actorSystem, rootActorInformation.props, rootActorInformation.name);
        }
    }
}
