/*
 * Decompiled with CFR 0.152.
 */
package risesoft.data.transfer.core;

import java.util.ArrayList;
import java.util.List;
import risesoft.data.transfer.core.channel.JoinOutExecutorChannel;
import risesoft.data.transfer.core.channel.OutChannel;
import risesoft.data.transfer.core.config.ConfigLoadManager;
import risesoft.data.transfer.core.context.JobContext;
import risesoft.data.transfer.core.context.StreamContext;
import risesoft.data.transfer.core.exchange.CoreExchange;
import risesoft.data.transfer.core.exchange.Exchange;
import risesoft.data.transfer.core.executor.ExecutorTaskQueue;
import risesoft.data.transfer.core.factory.FactoryManager;
import risesoft.data.transfer.core.handle.HandleManager;
import risesoft.data.transfer.core.handle.InitApplicationConfigHandle;
import risesoft.data.transfer.core.job.Job;
import risesoft.data.transfer.core.job.JobEndHandle;
import risesoft.data.transfer.core.job.JobEngine;
import risesoft.data.transfer.core.listener.JobListener;
import risesoft.data.transfer.core.listener.impl.ResultJobListener;
import risesoft.data.transfer.core.log.HandledLoggerFactory;
import risesoft.data.transfer.core.log.LoggerFactory;
import risesoft.data.transfer.core.plug.PlugManager;
import risesoft.data.transfer.core.statistics.Communication;
import risesoft.data.transfer.core.statistics.State;
import risesoft.data.transfer.core.stream.in.DataInputStreamFactory;
import risesoft.data.transfer.core.stream.out.DataOutputStreamFactory;
import risesoft.data.transfer.core.util.CloseUtils;
import risesoft.data.transfer.core.util.Configuration;
import risesoft.data.transfer.core.util.ValueUtils;

public class Engine {
    private static final String JOB_NAME_KEY = "core.name";

    public static ResultJobListener start(String jobId, Configuration configuration) {
        ResultJobListener resultJobListener = new ResultJobListener();
        Engine.start(jobId, configuration, resultJobListener, null);
        return resultJobListener;
    }

    public static JobContext start(String jobId, Configuration configuration, JobListener jobListener) {
        return Engine.start(jobId, configuration, jobListener, null);
    }

    public static JobContext start(String jobId, Configuration configuration, JobListener jobListener, LoggerFactory loggerFactory) {
        JobContext jobContext = new JobContext(new Communication(), jobId, new HandleManager(), jobListener);
        try {
            if (loggerFactory == null) {
                loggerFactory = new HandledLoggerFactory(jobContext.getHandles());
                jobContext.getHandles().add((HandledLoggerFactory)loggerFactory);
            }
            jobContext.setName(configuration.getString(JOB_NAME_KEY, Thread.currentThread().getName()));
            jobContext.setLoggerFactory(loggerFactory);
            Configuration loadedConfiguration = ConfigLoadManager.loadConfig(configuration, jobContext);
            jobContext.putInstance(loadedConfiguration);
            PlugManager.loadPlug(loadedConfiguration, jobContext);
            jobContext.doHandle(InitApplicationConfigHandle.class, handle -> handle.initApplicationConfig(loadedConfiguration));
            jobContext.getLogger().info(Engine.class, "\u6b63\u5728\u88c5\u914d\u6838\u5fc3\u7ec4\u4ef6");
            Engine.createJobs(loadedConfiguration, jobContext);
            jobContext.setInExecutorTaskQueue(FactoryManager.getInstanceOfConfiguration(ValueUtils.getRequired(loadedConfiguration.getConfiguration("core.executor.input"), "\u7f3a\u5c11\u8f93\u5165\u961f\u5217\u6267\u884c\u5668"), ExecutorTaskQueue.class, jobContext.getInstanceMap())).setInChannelConfiguration(ValueUtils.getRequired(loadedConfiguration.getConfiguration("core.channel.input"), "\u7f3a\u5931\u8f93\u5165\u901a\u9053")).setOutExecutorTaskQueue(FactoryManager.getInstanceOfConfiguration(ValueUtils.getRequired(loadedConfiguration.getConfiguration("core.executor.output"), "\u7f3a\u5c11\u8f93\u51fa\u961f\u5217\u6267\u884c\u5668"), ExecutorTaskQueue.class, jobContext.getInstanceMap())).setCoreExchange(new CoreExchange(FactoryManager.getInstanceOfConfiguration(ValueUtils.getRequired(loadedConfiguration.getConfiguration("core.exchange"), "\u7f3a\u5c11\u6838\u5fc3\u4ea4\u6362\u673a"), Exchange.class, jobContext.getInstanceMap()), jobContext.getCommunication())).getCoreExchange().setOutChannel(FactoryManager.getInstanceOfConfiguration(ValueUtils.getRequired(loadedConfiguration.getConfiguration("core.channel.output"), "\u7f3a\u5931\u8f93\u51fa\u901a\u9053"), OutChannel.class, jobContext.getInstanceMap()));
            jobContext.getCoreExchange().getOutChannel().setOutPutStream(new JoinOutExecutorChannel(jobContext.getOutExecutorTaskQueue()));
            jobContext.getLogger().info(Engine.class, "\u7ec4\u4ef6\u88c5\u914d\u5b8c\u6210\u4efb\u52a1\u5f00\u59cb");
            return JobEngine.start(jobContext);
        }
        catch (Throwable e) {
            jobContext.getCommunication().setThrowable(e, true);
            jobContext.getCommunication().setState(State.FAILED);
            Engine.onJobFlush(jobContext);
            jobContext.getLogger().error(Engine.class, "\u521d\u59cb\u5316\u4efb\u52a1\u5931\u8d25" + e.getMessage());
            return jobContext;
        }
    }

    private static void createJobs(Configuration configuration, JobContext jobContext) {
        List<Configuration> jobConfigs = ValueUtils.getRequired(configuration.getListConfiguration("job"), "\u6ca1\u6709\u4efb\u52a1");
        ArrayList<Job> jobs = new ArrayList<Job>();
        for (Configuration jobConfig : jobConfigs) {
            Job job = new Job();
            job.setStreamContext(new StreamContext());
            job.getStreamContext().setDataInputStreamFactory(FactoryManager.getInstanceOfConfiguration(ValueUtils.getRequired(jobConfig.getConfiguration("input"), "\u672a\u627e\u5230\u8f93\u5165\u6d41"), DataInputStreamFactory.class, jobContext.getInstanceMap()));
            job.getStreamContext().setDataOutputStreamFactory(FactoryManager.getInstanceOfConfiguration(ValueUtils.getRequired(jobConfig.getConfiguration("output"), "\u672a\u627e\u5230\u8f93\u51fa\u6d41"), DataOutputStreamFactory.class, jobContext.getInstanceMap()));
            jobs.add(job);
        }
        jobContext.setJobs(jobs);
    }

    public static void onJobFlush(JobContext jobContext) {
        if (jobContext.isEnd()) {
            return;
        }
        Communication communication = jobContext.getCommunication();
        if (communication.getState() == State.FAILED || communication.getState() == State.SUCCEEDED) {
            Engine.shutdown(jobContext);
            jobContext.setEnd(true);
            try {
                jobContext.doHandle(JobEndHandle.class, h -> h.onJobEnd(jobContext));
            }
            catch (Exception e) {
                communication.setState(State.FAILED, true);
                communication.setThrowable(e);
            }
            jobContext.getJobListener().end(communication);
        }
    }

    private static void shutdown(JobContext jobContext) {
        CloseUtils.close(jobContext.getOutExecutorTaskQueue());
        try {
            jobContext.getOutExecutorTaskQueue().shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            jobContext.getInExecutorTaskQueue().shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            jobContext.getCoreExchange().shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

