package org.eclipse.ditto.services.base;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.pubsub.DistributedPubSub;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.javadsl.AkkaManagement;
import akka.stream.ActorMaterializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigList;
import com.typesafe.config.ConfigObject;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValueFactory;
import java.lang.management.ManagementFactory;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;
import kamon.Kamon;
import kamon.prometheus.PrometheusReporter;
import kamon.system.SystemMetrics;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.services.base.config.ServiceSpecificConfig;
import org.eclipse.ditto.services.base.config.limits.LimitsConfig;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.services.utils.config.DittoConfigError;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.services.utils.config.ScopedConfig;
import org.eclipse.ditto.services.utils.config.raw.RawConfigSupplier;
import org.eclipse.ditto.services.utils.devops.DevOpsCommandsActor;
import org.eclipse.ditto.services.utils.devops.LogbackLoggingFacade;
import org.eclipse.ditto.services.utils.health.status.StatusSupplierActor;
import org.eclipse.ditto.services.utils.metrics.config.MetricsConfig;
import org.eclipse.ditto.services.utils.metrics.prometheus.PrometheusReporterRoute;
import org.eclipse.ditto.services.utils.persistence.mongo.config.WithMongoDbConfig;
import org.eclipse.ditto.signals.commands.messages.MessageCommandSizeValidator;
import org.eclipse.ditto.signals.commands.policies.PolicyCommandSizeValidator;
import org.eclipse.ditto.signals.commands.things.ThingCommandSizeValidator;
import org.slf4j.Logger;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/services/base/DittoService.class */
public abstract class DittoService<C extends ServiceSpecificConfig> {
    public static final String CLUSTER_NAME = "ditto-cluster";
    public static final String DITTO_CONFIG_PATH = "ditto";
    private final Logger logger;
    private final String serviceName;
    private final String rootActorName;
    private final Config rawConfig = determineRawConfig();
    private final C serviceSpecificConfig = getServiceSpecificConfig(tryToGetDittoConfigOrEmpty(this.rawConfig));

    @Nullable
    private PrometheusReporter prometheusReporter;

    @Immutable
    /* loaded from: input_file:org/eclipse/ditto/services/base/DittoService$RootActorInformation.class */
    public static final class RootActorInformation {
        private final Props props;
        private final String name;

        private RootActorInformation(Props props, String str) {
            this.props = props;
            this.name = str;
        }

        public static RootActorInformation getInstance(Props props, String str) {
            ConditionChecker.checkNotNull(props, "root actor props");
            ConditionChecker.argumentNotEmpty(str, "root actor name");
            return new RootActorInformation(props, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DittoService(Logger logger, String str, String str2) {
        this.logger = (Logger) ConditionChecker.checkNotNull(logger, "logger");
        this.serviceName = (String) ConditionChecker.argumentNotEmpty(str, "service name");
        this.rootActorName = (String) ConditionChecker.argumentNotEmpty(str2, "root actor name");
        if (null == this.serviceSpecificConfig) {
            throw new DittoConfigError("The service specific config must not be null!");
        }
        logger.debug("Using service specific config: <{}>.", this.serviceSpecificConfig);
    }

    protected Config determineRawConfig() {
        Config config = RawConfigSupplier.of(this.serviceName).get();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Using config <{}>", config.root().render(ConfigRenderOptions.concise()));
        }
        return config;
    }

    private Config appendDittoInfo(Config config) {
        ConfigObject root = ConfigFactory.empty().withValue("name", ConfigValueFactory.fromAnyRef(this.serviceName)).withValue("instance-id", ConfigValueFactory.fromAnyRef(InstanceIdentifierSupplier.getInstance().get())).root();
        ConfigList fromIterable = ConfigValueFactory.fromIterable(ManagementFactory.getRuntimeMXBean().getInputArguments());
        return config.withValue("ditto.info", ConfigFactory.empty().withValue("service", root).withValue("vm-args", fromIterable).withValue("env", ConfigValueFactory.fromMap(System.getenv())).root());
    }

    private static ScopedConfig tryToGetDittoConfigOrEmpty(Config config) {
        try {
            return getDittoConfigOrEmpty(config);
        } catch (ConfigException.WrongType e) {
            throw new DittoConfigError(MessageFormat.format("Value at <{0}> was not of type Config!", "ditto"), e);
        }
    }

    private static ScopedConfig getDittoConfigOrEmpty(Config config) {
        return config.hasPath("ditto") ? DefaultScopedConfig.dittoScoped(config) : DefaultScopedConfig.empty("ditto");
    }

    protected abstract C getServiceSpecificConfig(ScopedConfig scopedConfig);

    public ActorSystem start() {
        return MainMethodExceptionHandler.getInstance(this.logger).call(this::doStart);
    }

    protected ActorSystem doStart() {
        logRuntimeParameters();
        Config appendDittoInfo = appendDittoInfo(appendAkkaPersistenceMongoUriToRawConfig());
        startKamon();
        ActorSystem createActorSystem = createActorSystem(appendDittoInfo);
        initializeActorSystem(createActorSystem);
        startKamonPrometheusHttpEndpoint(createActorSystem);
        return createActorSystem;
    }

    private Config appendAkkaPersistenceMongoUriToRawConfig() {
        if (!isServiceWithMongoDbConfig()) {
            return this.rawConfig;
        }
        return this.rawConfig.withValue("akka.contrib.persistence.mongodb.mongo.mongouri", ConfigValueFactory.fromAnyRef(((WithMongoDbConfig) this.serviceSpecificConfig).getMongoDbConfig().getMongoDbUri()));
    }

    private boolean isServiceWithMongoDbConfig() {
        return this.serviceSpecificConfig instanceof WithMongoDbConfig;
    }

    private void logRuntimeParameters() {
        this.logger.info("Running with following runtime parameters: <{}>.", ManagementFactory.getRuntimeMXBean().getInputArguments());
        this.logger.info("Available processors: <{}>.", Integer.valueOf(Runtime.getRuntime().availableProcessors()));
    }

    private void startKamon() {
        Kamon.reconfigure(ConfigFactory.load("kamon"));
        MetricsConfig metricsConfig = this.serviceSpecificConfig.getMetricsConfig();
        if (metricsConfig.isSystemMetricsEnabled()) {
            SystemMetrics.startCollecting();
        }
        if (metricsConfig.isPrometheusEnabled()) {
            startPrometheusReporter();
        }
    }

    private void startPrometheusReporter() {
        try {
            this.prometheusReporter = new PrometheusReporter();
            Kamon.addReporter(this.prometheusReporter);
            this.logger.info("Successfully added Prometheus reporter to Kamon.");
        } catch (Exception e) {
            this.logger.error("Error while adding Prometheus reporter to Kamon.", (Throwable) e);
        }
    }

    protected void initializeActorSystem(ActorSystem actorSystem) {
        startAkkaManagement(actorSystem);
        startClusterBootstrap(actorSystem);
        startStatusSupplierActor(actorSystem);
        startDevOpsCommandsActor(actorSystem);
        startServiceRootActors(actorSystem, this.serviceSpecificConfig);
        setUpCoordinatedShutdown(actorSystem);
    }

    private void startKamonPrometheusHttpEndpoint(ActorSystem actorSystem) {
        MetricsConfig metricsConfig = this.serviceSpecificConfig.getMetricsConfig();
        if (!metricsConfig.isPrometheusEnabled() || null == this.prometheusReporter) {
            return;
        }
        String prometheusHostname = metricsConfig.getPrometheusHostname();
        int prometheusPort = metricsConfig.getPrometheusPort();
        ActorMaterializer createActorMaterializer = createActorMaterializer(actorSystem);
        Http.get(actorSystem).bindAndHandle(PrometheusReporterRoute.buildPrometheusReporterRoute(this.prometheusReporter).flow(actorSystem, createActorMaterializer), ConnectHttp.toHost(prometheusHostname, prometheusPort), createActorMaterializer).thenAccept(serverBinding -> {
            CoordinatedShutdown.get(actorSystem).addTask(CoordinatedShutdown.PhaseServiceUnbind(), "shutdown_prometheus_http_endpoint", () -> {
                this.logger.info("Gracefully shutting down Prometheus HTTP endpoint ...");
                return serverBinding.terminate(Duration.ofSeconds(1L)).handle((httpTerminated, th) -> {
                    return Done.getInstance();
                });
            });
        }).exceptionally(th -> {
            this.logger.error("Kamon Prometheus HTTP endpoint could not be started: {}", th.getMessage(), th);
            this.logger.error("Terminating ActorSystem!");
            actorSystem.terminate();
            return null;
        });
    }

    protected ActorSystem createActorSystem(Config config) {
        return ActorSystem.create(CLUSTER_NAME, config);
    }

    private void startAkkaManagement(ActorSystem actorSystem) {
        this.logger.info("Starting AkkaManagement ...");
        AkkaManagement.get(actorSystem).start().whenComplete((uri, th) -> {
            if (null != th) {
                this.logger.error("Error during start of AkkaManagement: <{}>!", th.getMessage(), th);
            } else {
                this.logger.info("Started AkkaManagement on URI <{}>.", uri);
            }
        });
    }

    private void startClusterBootstrap(ActorSystem actorSystem) {
        this.logger.info("Starting ClusterBootstrap ...");
        ClusterBootstrap.get(actorSystem).start();
    }

    protected void startStatusSupplierActor(ActorSystem actorSystem) {
        startActor(actorSystem, StatusSupplierActor.props(this.rootActorName), StatusSupplierActor.ACTOR_NAME);
    }

    private ActorRef startActor(ActorSystem actorSystem, Props props, String str) {
        logStartingActor(str);
        return actorSystem.actorOf(props, str);
    }

    private void logStartingActor(String str) {
        this.logger.info("Starting actor <{}>.", str);
    }

    protected void startDevOpsCommandsActor(ActorSystem actorSystem) {
        startActor(actorSystem, DevOpsCommandsActor.props(LogbackLoggingFacade.newInstance(), this.serviceName, InstanceIdentifierSupplier.getInstance().get()), DevOpsCommandsActor.ACTOR_NAME);
    }

    protected void startServiceRootActors(ActorSystem actorSystem, C c) {
        this.logger.info("Waiting for member to be up before proceeding with further initialisation.");
        Cluster.get(actorSystem).registerOnMemberUp(() -> {
            this.logger.info("Member successfully joined the cluster, instantiating remaining actors.");
            ActorRef distributedPubSubMediatorActor = getDistributedPubSubMediatorActor(actorSystem);
            ActorMaterializer createActorMaterializer = createActorMaterializer(actorSystem);
            injectSystemPropertiesLimits(c);
            startMainRootActor(actorSystem, getMainRootActorProps(c, distributedPubSubMediatorActor, createActorMaterializer));
            startAdditionalRootActors(actorSystem, getAdditionalRootActorsInformation(c, distributedPubSubMediatorActor, createActorMaterializer));
        });
    }

    protected void injectSystemPropertiesLimits(C c) {
        LimitsConfig limitsConfig = c.getLimitsConfig();
        System.setProperty(ThingCommandSizeValidator.DITTO_LIMITS_THINGS_MAX_SIZE_BYTES, Long.toString(limitsConfig.getThingsMaxSize()));
        System.setProperty(PolicyCommandSizeValidator.DITTO_LIMITS_POLICIES_MAX_SIZE_BYTES, Long.toString(limitsConfig.getPoliciesMaxSize()));
        System.setProperty(MessageCommandSizeValidator.DITTO_LIMITS_MESSAGES_MAX_SIZE_BYTES, Long.toString(limitsConfig.getMessagesMaxSize()));
    }

    private static ActorRef getDistributedPubSubMediatorActor(ActorSystem actorSystem) {
        return DistributedPubSub.get(actorSystem).mediator();
    }

    private static ActorMaterializer createActorMaterializer(ActorRefFactory actorRefFactory) {
        return ActorMaterializer.create(actorRefFactory);
    }

    protected abstract Props getMainRootActorProps(C c, ActorRef actorRef, ActorMaterializer actorMaterializer);

    protected ActorRef startMainRootActor(ActorSystem actorSystem, Props props) {
        return startActor(actorSystem, props, this.rootActorName);
    }

    protected Collection<RootActorInformation> getAdditionalRootActorsInformation(C c, ActorRef actorRef, ActorMaterializer actorMaterializer) {
        return Collections.emptyList();
    }

    protected void startAdditionalRootActors(ActorSystem actorSystem, Iterable<RootActorInformation> iterable) {
        for (RootActorInformation rootActorInformation : iterable) {
            startActor(actorSystem, rootActorInformation.props, rootActorInformation.name);
        }
    }

    private void setUpCoordinatedShutdown(ActorSystem actorSystem) {
        CoordinatedShutdown coordinatedShutdown = CoordinatedShutdown.get(actorSystem);
        coordinatedShutdown.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind(), "log_shutdown_initiation", () -> {
            this.logger.info("Initiated coordinated shutdown; gracefully shutting down ...");
            return CompletableFuture.completedFuture(Done.getInstance());
        });
        coordinatedShutdown.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate(), "log_successful_graceful_shutdown", () -> {
            this.logger.info("Graceful shutdown completed.");
            return CompletableFuture.completedFuture(Done.getInstance());
        });
    }
}
