package org.eclipse.ditto.base.service;

import ch.qos.logback.classic.LoggerContext;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigList;
import com.typesafe.config.ConfigObject;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigValueFactory;
import java.lang.management.ManagementFactory;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import kamon.Kamon;
import kamon.prometheus.PrometheusReporter;
import net.logstash.logback.fieldnames.ShortenedFieldNames;
import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.actor.Props;
import org.apache.pekko.cluster.Cluster;
import org.apache.pekko.cluster.pubsub.DistributedPubSub;
import org.apache.pekko.http.javadsl.Http;
import org.apache.pekko.management.cluster.bootstrap.ClusterBootstrap;
import org.apache.pekko.management.javadsl.PekkoManagement;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.common.DittoSystemProperties;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
import org.eclipse.ditto.base.service.config.ServiceSpecificConfig;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.base.service.devops.DevOpsCommandsActor;
import org.eclipse.ditto.base.service.devops.LogbackLoggingFacade;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.DittoConfigError;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.config.raw.RawConfigSupplier;
import org.eclipse.ditto.internal.utils.health.status.StatusSupplierActor;
import org.eclipse.ditto.internal.utils.metrics.config.MetricsConfig;
import org.eclipse.ditto.internal.utils.metrics.prometheus.PrometheusReporterRoute;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.config.TracingConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/base/service/DittoService.class */
public abstract class DittoService<C extends ServiceSpecificConfig> {
    public static final String CLUSTER_NAME = "ditto-cluster";
    public static final String DITTO_CONFIG_PATH = "ditto";
    protected static final String MONGO_URI_CONFIG_PATH = "pekko.contrib.persistence.mongodb.mongo.mongouri";
    protected final Config rawConfig = determineRawConfig();
    protected final C serviceSpecificConfig = getServiceSpecificConfig(tryToGetDittoConfigOrEmpty(this.rawConfig));
    private final Logger logger;
    private final String serviceName;
    private final String rootActorName;

    @Nullable
    private PrometheusReporter prometheusReporter;

    /* JADX INFO: Access modifiers changed from: protected */
    public DittoService(Logger logger, String str, String str2) {
        this.logger = (Logger) ConditionChecker.checkNotNull(logger, ShortenedFieldNames.FIELD_LOGGER);
        this.serviceName = (String) ConditionChecker.argumentNotEmpty(str, "service name");
        this.rootActorName = (String) ConditionChecker.argumentNotEmpty(str2, "root actor name");
        if (null == this.serviceSpecificConfig) {
            throw new DittoConfigError("The service specific config must not be null!");
        }
        logger.debug("Using service specific config: <{}>.", this.serviceSpecificConfig);
    }

    public ActorSystem start() {
        return MainMethodExceptionHandler.getInstance(this.logger).call(this::doStart);
    }

    private Config determineRawConfig() {
        Config config = RawConfigSupplier.of(this.serviceName).get();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Using config <{}>", config.root().render(ConfigRenderOptions.concise()));
        }
        return config;
    }

    private Config appendDittoInfo(Config config) {
        ConfigObject root = ConfigFactory.empty().withValue("name", ConfigValueFactory.fromAnyRef(this.serviceName)).withValue("instance-id", ConfigValueFactory.fromAnyRef(InstanceIdentifierSupplier.getInstance().get())).root();
        ConfigList fromIterable = ConfigValueFactory.fromIterable(ManagementFactory.getRuntimeMXBean().getInputArguments());
        return config.withValue("ditto.info", ConfigFactory.empty().withValue("service", root).withValue("vm-args", fromIterable).withValue("env", ConfigValueFactory.fromMap(System.getenv())).root());
    }

    private static ScopedConfig tryToGetDittoConfigOrEmpty(Config config) {
        try {
            return getDittoConfigOrEmpty(config);
        } catch (ConfigException.WrongType e) {
            throw new DittoConfigError(MessageFormat.format("Value at <{0}> was not of type Config!", "ditto"), e);
        }
    }

    private static ScopedConfig getDittoConfigOrEmpty(Config config) {
        return config.hasPath("ditto") ? DefaultScopedConfig.dittoScoped(config) : DefaultScopedConfig.empty("ditto");
    }

    protected abstract C getServiceSpecificConfig(ScopedConfig scopedConfig);

    private ActorSystem doStart() {
        logRuntimeParameters();
        Config appendDittoInfo = appendDittoInfo(appendPekkoPersistenceMongoUriToRawConfig());
        startKamon();
        ActorSystem createActorSystem = createActorSystem(appendDittoInfo);
        initializeActorSystem(createActorSystem);
        startKamonPrometheusHttpEndpoint(createActorSystem);
        return createActorSystem;
    }

    protected Config appendPekkoPersistenceMongoUriToRawConfig() {
        return this.rawConfig;
    }

    private void logRuntimeParameters() {
        this.logger.info("Running with following runtime parameters: <{}>.", ManagementFactory.getRuntimeMXBean().getInputArguments());
        this.logger.info("Available processors: <{}>.", Integer.valueOf(Runtime.getRuntime().availableProcessors()));
    }

    private void startKamon() {
        Kamon.reconfigure(ConfigFactory.load("kamon"));
        MetricsConfig metricsConfig = this.serviceSpecificConfig.getMetricsConfig();
        TracingConfig tracingConfig = this.serviceSpecificConfig.getTracingConfig();
        if (metricsConfig.isSystemMetricsEnabled() || tracingConfig.isTracingEnabled()) {
            Kamon.init();
        }
        if (metricsConfig.isPrometheusEnabled()) {
            startPrometheusReporter();
        }
        DittoTracing.init(tracingConfig);
    }

    private void startPrometheusReporter() {
        try {
            this.prometheusReporter = PrometheusReporter.create();
            Kamon.addReporter("prometheus reporter", this.prometheusReporter);
            this.logger.info("Successfully added Prometheus reporter to Kamon.");
        } catch (Exception e) {
            this.logger.error("Error while adding Prometheus reporter to Kamon.", (Throwable) e);
        }
    }

    private void initializeActorSystem(ActorSystem actorSystem) {
        startPekkoManagement(actorSystem);
        startClusterBootstrap(actorSystem);
        startStatusSupplierActor(actorSystem);
        startDevOpsCommandsActor(actorSystem);
        startServiceRootActors(actorSystem, this.serviceSpecificConfig);
        setUpCoordinatedShutdown(actorSystem);
    }

    private void startKamonPrometheusHttpEndpoint(ActorSystem actorSystem) {
        MetricsConfig metricsConfig = this.serviceSpecificConfig.getMetricsConfig();
        if (!metricsConfig.isPrometheusEnabled() || null == this.prometheusReporter) {
            return;
        }
        Http.get(actorSystem).newServerAt(metricsConfig.getPrometheusHostname(), metricsConfig.getPrometheusPort()).bindFlow(PrometheusReporterRoute.buildPrometheusReporterRoute(this.prometheusReporter).flow(actorSystem)).thenAccept(serverBinding -> {
            serverBinding.addToCoordinatedShutdown(Duration.ofSeconds(1L), actorSystem);
            this.logger.info("Created new server binding for Kamon Prometheus HTTP endpoint.");
        }).exceptionally(th -> {
            this.logger.error("Kamon Prometheus HTTP endpoint could not be started: {}", th.getMessage(), th);
            this.logger.error("Terminating ActorSystem!");
            actorSystem.terminate();
            return null;
        });
    }

    private ActorSystem createActorSystem(Config config) {
        return ActorSystem.create(CLUSTER_NAME, config);
    }

    private void startPekkoManagement(ActorSystem actorSystem) {
        this.logger.info("Starting PekkoManagement ...");
        PekkoManagement.get(actorSystem).start().whenComplete((uri, th) -> {
            if (null != th) {
                this.logger.error("Error during start of PekkoManagement: <{}>!", th.getMessage(), th);
            } else {
                this.logger.info("Started PekkoManagement on URI <{}>.", uri);
            }
        });
    }

    private void startClusterBootstrap(ActorSystem actorSystem) {
        this.logger.info("Starting ClusterBootstrap ...");
        ClusterBootstrap.get(actorSystem).start();
    }

    private void startStatusSupplierActor(ActorSystem actorSystem) {
        startActor(actorSystem, StatusSupplierActor.props(this.rootActorName), StatusSupplierActor.ACTOR_NAME);
    }

    private ActorRef startActor(ActorSystem actorSystem, Props props, String str) {
        logStartingActor(str);
        return actorSystem.actorOf(props, str);
    }

    private void logStartingActor(String str) {
        this.logger.info("Starting actor <{}>.", str);
    }

    private void startDevOpsCommandsActor(ActorSystem actorSystem) {
        startActor(actorSystem, DevOpsCommandsActor.props(LogbackLoggingFacade.newInstance(), this.serviceName, InstanceIdentifierSupplier.getInstance().get()), DevOpsCommandsActor.ACTOR_NAME);
    }

    private void startServiceRootActors(ActorSystem actorSystem, C c) {
        this.logger.info("Waiting for member to be up before proceeding with further initialisation.");
        Cluster.get(actorSystem).registerOnMemberUp(() -> {
            this.logger.info("Member successfully joined the cluster, instantiating remaining actors.");
            ActorRef distributedPubSubMediatorActor = getDistributedPubSubMediatorActor(actorSystem);
            injectSystemPropertiesLimits(c);
            startMainRootActor(actorSystem, getMainRootActorProps(c, distributedPubSubMediatorActor));
            RootActorStarter.get(actorSystem, ScopedConfig.dittoExtension(actorSystem.settings().config())).execute();
        });
    }

    private void injectSystemPropertiesLimits(C c) {
        LimitsConfig limitsConfig = c.getLimitsConfig();
        System.setProperty(DittoSystemProperties.DITTO_LIMITS_THINGS_MAX_SIZE_BYTES, Long.toString(limitsConfig.getThingsMaxSize()));
        System.setProperty(DittoSystemProperties.DITTO_LIMITS_POLICIES_MAX_SIZE_BYTES, Long.toString(limitsConfig.getPoliciesMaxSize()));
        System.setProperty(DittoSystemProperties.DITTO_LIMITS_MESSAGES_MAX_SIZE_BYTES, Long.toString(limitsConfig.getMessagesMaxSize()));
        System.setProperty(FeatureToggle.MERGE_THINGS_ENABLED, Boolean.toString(this.rawConfig.getBoolean(FeatureToggle.MERGE_THINGS_ENABLED)));
        System.setProperty(DittoSystemProperties.DITTO_LIMITS_POLICY_IMPORTS_LIMIT, Integer.toString(limitsConfig.getPolicyImportsLimit()));
    }

    private static ActorRef getDistributedPubSubMediatorActor(ActorSystem actorSystem) {
        return DistributedPubSub.get(actorSystem).mediator();
    }

    protected abstract Props getMainRootActorProps(C c, ActorRef actorRef);

    private ActorRef startMainRootActor(ActorSystem actorSystem, Props props) {
        return startActor(actorSystem, props, this.rootActorName);
    }

    private void setUpCoordinatedShutdown(ActorSystem actorSystem) {
        CoordinatedShutdown coordinatedShutdown = CoordinatedShutdown.get(actorSystem);
        coordinatedShutdown.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind(), "log_shutdown_initiation", () -> {
            this.logger.info("Initiated coordinated shutdown; gracefully shutting down ...");
            coordinatedShutdown.getShutdownReason().ifPresent(reason -> {
                this.logger.info("Shutdown reason was - <{}>", reason);
            });
            return PekkoManagement.get(actorSystem).stop().thenApply(done -> {
                this.logger.info("PekkoManagement stopped!");
                return done;
            });
        });
        coordinatedShutdown.addTask(CoordinatedShutdown.PhaseActorSystemTerminate(), "log_successful_graceful_shutdown", () -> {
            this.logger.info("Graceful shutdown completed.");
            ((LoggerContext) LoggerFactory.getILoggerFactory()).stop();
            return CompletableFuture.completedFuture(Done.getInstance());
        });
    }
}
