/*
 * Decompiled with CFR 0.152.
 */
package risesoft.data.transfer.core.job;

import risesoft.data.transfer.core.Engine;
import risesoft.data.transfer.core.context.JobContext;
import risesoft.data.transfer.core.context.StreamContext;
import risesoft.data.transfer.core.executor.ExecutorListenerAdapter;
import risesoft.data.transfer.core.executor.in.JobInputExecutorFactory;
import risesoft.data.transfer.core.executor.out.JobOutputExecutorFactory;
import risesoft.data.transfer.core.job.Job;
import risesoft.data.transfer.core.job.JobRunningController;
import risesoft.data.transfer.core.job.JobStartHandle;
import risesoft.data.transfer.core.statistics.Communication;
import risesoft.data.transfer.core.statistics.State;

public class JobEngine {
    public static JobContext start(JobContext jobContext) {
        jobContext.getCommunication().reset();
        JobEngine.startJob(jobContext);
        return jobContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void startJob(final JobContext jobContext) {
        try {
            final Communication communication = jobContext.getCommunication();
            if (jobContext.hasJob()) {
                jobContext.doHandle(JobStartHandle.class, h -> h.onJobStart(jobContext));
                Job job = jobContext.nextJob();
                final JobRunningController jobRunningController = new JobRunningController(jobContext);
                StreamContext streamContext = job.getStreamContext();
                streamContext.getDataInputStreamFactory().init();
                streamContext.getDataOutputStreamFactory().init();
                jobContext.getInExecutorTaskQueue().setExecutorFacoty(new JobInputExecutorFactory(jobContext, streamContext.getDataInputStreamFactory()));
                jobContext.getOutExecutorTaskQueue().setExecutorFacoty(new JobOutputExecutorFactory(streamContext.getDataOutputStreamFactory(), jobContext));
                jobContext.getInExecutorTaskQueue().setExecutorListener(new ExecutorListenerAdapter(){

                    @Override
                    public void onError(Throwable e) {
                        jobRunningController.onError(e);
                    }

                    @Override
                    public void taskEnd(Object task) {
                        jobContext.getCoreExchange().flush();
                        jobRunningController.inEnd();
                        communication.setLongCounter("readJobEnd", System.currentTimeMillis());
                    }

                    @Override
                    public void taskStart(Object task) {
                        if (communication.getLongCounter("readJobStart") == 0L) {
                            communication.setLongCounter("readJobStart", System.currentTimeMillis());
                        }
                    }
                });
                jobContext.getOutExecutorTaskQueue().setExecutorListener(new ExecutorListenerAdapter(){

                    @Override
                    public void onError(Throwable e) {
                        jobRunningController.onError(e);
                    }

                    @Override
                    public void taskEnd(Object task) {
                        if (jobRunningController.isEnd()) {
                            communication.setLongCounter("writerJobEnd", System.currentTimeMillis());
                            jobRunningController.outEnd();
                        }
                    }

                    @Override
                    public void taskStart(Object task) {
                        if (communication.getLongCounter("writerJobStart") == 0L) {
                            communication.setLongCounter("writerJobStart", System.currentTimeMillis());
                        }
                    }
                });
                jobContext.getInExecutorTaskQueue().addBatch(streamContext.getDataInputStreamFactory().splitToData(jobContext.getInExecutorTaskQueue().getExecutorSize()));
                jobContext.getOutExecutorTaskQueue().start();
                jobContext.getInExecutorTaskQueue().start();
            } else {
                jobContext.getCommunication().setState(State.SUCCEEDED);
            }
        }
        catch (Throwable e) {
            jobContext.getCommunication().setThrowable(e, true);
            jobContext.getCommunication().setState(State.FAILED);
        }
        finally {
            Engine.onJobFlush(jobContext);
        }
    }
}

