package tech.powerjob.remote.akka;

import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.akka.AkkaMappingService;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;

/* loaded from: input_file:BOOT-INF/lib/powerjob-remote-impl-akka-4.3.8.jar:tech/powerjob/remote/akka/AkkaCSInitializer.class */
public class AkkaCSInitializer implements CSInitializer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AkkaCSInitializer.class);
    private ActorSystem actorSystem;
    private CSInitializerConfig config;

    @Override // tech.powerjob.remote.framework.cs.CSInitializer
    public String type() {
        return Protocol.AKKA.name();
    }

    @Override // tech.powerjob.remote.framework.cs.CSInitializer
    public void init(CSInitializerConfig cSInitializerConfig) {
        this.config = cSInitializerConfig;
        Address bindAddress = cSInitializerConfig.getBindAddress();
        log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
        newHashMap.put("akka.remote.artery.canonical.port", Integer.valueOf(bindAddress.getPort()));
        Config withFallback = ConfigFactory.parseMap(newHashMap).withFallback((ConfigMergeable) ConfigFactory.load(AkkaConstant.AKKA_CONFIG));
        log.info("[PowerJob-AKKA] try to start AKKA System.");
        this.actorSystem = ActorSystem.create(AkkaConstant.fetchActorSystemName(cSInitializerConfig.getServerType()), withFallback);
        this.actorSystem.eventStream().subscribe(this.actorSystem.actorOf(Props.create((Class<?>) AkkaTroubleshootingActor.class, new Object[0]), "troubleshooting"), DeadLetter.class);
        log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", this.actorSystem.name());
    }

    @Override // tech.powerjob.remote.framework.cs.CSInitializer
    public Transporter buildTransporter() {
        return new AkkaTransporter(this.actorSystem);
    }

    @Override // tech.powerjob.remote.framework.cs.CSInitializer
    public void bindHandlers(List<ActorInfo> list) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        list.forEach(actorInfo -> {
            String path = actorInfo.getAnno().path();
            AkkaMappingService.ActorConfig parseActorName = AkkaMappingService.parseActorName(path);
            log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", path, JsonUtils.toJSONString(parseActorName));
            this.actorSystem.actorOf(AkkaProxyActor.props(actorInfo).withDispatcher("akka.".concat(parseActorName.getDispatcherName())).withRouter(new RoundRobinPool(availableProcessors)), parseActorName.getActorName());
        });
    }

    @Override // tech.powerjob.remote.framework.cs.CSInitializer
    public void close() throws IOException {
        this.actorSystem.terminate();
    }
}
