package org.apache.kylin.rest.config.initialize;

import io.kyligence.kap.guava20.shaded.common.eventbus.Subscribe;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.scheduler.EpochStartedNotifier;
import org.apache.kylin.common.scheduler.ProjectControlledNotifier;
import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.rest.service.UserAclService;
import org.apache.kylin.rest.service.UserService;
import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler;
import org.apache.kylin.rest.service.task.RecommendationTopNUpdateScheduler;
import org.apache.kylin.rest.util.CreateAdminUserUtils;
import org.apache.kylin.rest.util.InitResourceGroupUtils;
import org.apache.kylin.rest.util.InitUserGroupUtils;
import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/kylin/rest/config/initialize/EpochChangedListener.class */
public class EpochChangedListener {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(EpochChangedListener.class);
    private static final String GLOBAL = "_global";

    @Autowired
    Environment env;

    @Autowired
    @Qualifier("userService")
    UserService userService;

    @Autowired
    @Qualifier("userAclService")
    UserAclService userAclService;

    @Autowired
    @Qualifier("recommendationUpdateScheduler")
    RecommendationTopNUpdateScheduler recommendationUpdateScheduler;

    @Subscribe
    public void onProjectControlled(ProjectControlledNotifier projectControlledNotifier) throws IOException {
        String project = projectControlledNotifier.getProject();
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        EpochManager epochManager = EpochManager.getInstance();
        if (GLOBAL.equals(project)) {
            CreateAdminUserUtils.createAllAdmins(this.userService, this.env);
            InitUserGroupUtils.initUserGroups(this.env);
            UnitOfWork.doInTransactionWithRetry(() -> {
                ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).createMetaStoreUuidIfNotExist();
                return null;
            }, "", 1);
            InitResourceGroupUtils.initResourceGroup();
            this.userAclService.syncAdminUserAcl();
            return;
        }
        if (!EpochManager.getInstance().checkEpochValid(project)) {
            log.warn("epoch:{} is invalid in project controlled", project);
            return;
        }
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(project);
        if (nDefaultScheduler.hasStarted() && epochManager.checkEpochId(nDefaultScheduler.getContext().getEpochId(), project)) {
            return;
        }
        if (nDefaultScheduler.hasStarted()) {
            nDefaultScheduler.forceShutdown();
        }
        log.info("start thread of project: {}", project);
        NDefaultScheduler nDefaultScheduler2 = NDefaultScheduler.getInstance(project);
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            nDefaultScheduler2.init(new JobEngineConfig(instanceFromEnv));
            if (!nDefaultScheduler2.hasStarted()) {
                throw new RuntimeException("Scheduler for " + project + " has not been started");
            }
            StreamingScheduler streamingScheduler = StreamingScheduler.getInstance(project);
            streamingScheduler.init();
            if (!streamingScheduler.getHasStarted().get()) {
                throw new RuntimeException("Streaming Scheduler for " + project + " has not been started");
            }
            QueryHistoryTaskScheduler queryHistoryTaskScheduler = QueryHistoryTaskScheduler.getInstance(project);
            queryHistoryTaskScheduler.init();
            if (!queryHistoryTaskScheduler.hasStarted()) {
                throw new RuntimeException("Query history accelerate scheduler for " + project + " has not been started");
            }
            this.recommendationUpdateScheduler.addProject(project);
            return 0;
        }, project, 1);
        nDefaultScheduler2.setHasFinishedTransactions(new AtomicBoolean(true));
    }

    @Subscribe
    public void onProjectEscaped(ProjectEscapedNotifier projectEscapedNotifier) {
        String project = projectEscapedNotifier.getProject();
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (GLOBAL.equals(project)) {
            return;
        }
        log.info("Shutdown related thread: {}", project);
        try {
            NExecutableManager.getInstance(instanceFromEnv, project).destoryAllProcess();
            QueryHistoryTaskScheduler.shutdownByProject(project);
            NDefaultScheduler.shutdownByProject(project);
            StreamingScheduler.shutdownByProject(project);
            this.recommendationUpdateScheduler.removeProject(project);
        } catch (Exception e) {
            log.warn("error when shutdown " + project + " thread", e);
        }
    }

    @Subscribe
    public void onEpochStarted(EpochStartedNotifier epochStartedNotifier) {
        ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).leaderCatchup();
    }
}
