/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.MantisProperties;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.MetricsServer;
import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MachineDefinitions;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.SourceHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.WorkerMap;
import io.mantisrx.runtime.command.CommandException;
import io.mantisrx.runtime.command.ValidateJob;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.runtime.executor.IllegalMantisJobException;
import io.mantisrx.runtime.executor.PortSelector;
import io.mantisrx.runtime.executor.PortSelectorInRange;
import io.mantisrx.runtime.executor.StageExecutors;
import io.mantisrx.runtime.executor.WorkerConsumerRemoteObservable;
import io.mantisrx.runtime.executor.WorkerPublisherRemoteObservable;
import io.mantisrx.runtime.lifecycle.Lifecycle;
import io.mantisrx.runtime.lifecycle.ServiceLocator;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.ParameterUtils;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.RxMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import mantis.io.reactivex.netty.RxNetty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.BehaviorSubject;

public class LocalJobExecutorNetworked {
    private static final Logger logger = LoggerFactory.getLogger(LocalJobExecutorNetworked.class);
    private static final int numPartitions = 1;
    private static final Action0 nullAction = () -> System.exit(0);

    private LocalJobExecutorNetworked() {
    }

    private static void startSource(int index2, int port, int workersAtNextStage, SourceHolder source2, StageConfig stage, Context context, Observable<Integer> stageWorkersObservable) {
        logger.debug("Creating source publisher on port " + port);
        WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable(port, null, Observable.just(workersAtNextStage * 1), null);
        StageExecutors.executeSource(index2, source2, stage, publisher, context, stageWorkersObservable);
    }

    private static void startIntermediate(int[] previousStagePorts, int port, StageConfig stage, Context context, int workerIndex, int workersAtNextStage, int stageNumber, int workersAtPreviousStage) {
        if (logger.isDebugEnabled()) {
            StringBuilder portsToString = new StringBuilder();
            for (int previousPort : previousStagePorts) {
                portsToString.append(previousPort + " ");
            }
            logger.debug("Creating intermediate consumer connecting to publishers on ports " + portsToString);
        }
        Observable<Set<Endpoint>> endpoints = LocalJobExecutorNetworked.staticEndpoints(previousStagePorts, stageNumber, workerIndex, 1);
        WorkerConsumerRemoteObservable intermediateConsumer = new WorkerConsumerRemoteObservable(null, LocalJobExecutorNetworked.staticInjector(endpoints));
        logger.debug("Creating intermediate publisher on port " + port);
        WorkerPublisherRemoteObservable intermediatePublisher = new WorkerPublisherRemoteObservable(port, null, Observable.just(workersAtNextStage * 1), null);
        StageExecutors.executeIntermediate(intermediateConsumer, stage, intermediatePublisher, context);
    }

    private static void startSink(StageConfig previousStage, int[] previousStagePorts, StageConfig stage, PortSelector portSelector, SinkHolder sink, Context context, Action0 sinkObservableCompletedCallback, Action0 sinkObservableTerminatedCompletedCallback, Action1<Throwable> sinkObservableErrorCallback, int stageNumber, int workerIndex, int workersAtPreviousStage) {
        if (logger.isDebugEnabled()) {
            StringBuilder portsToString = new StringBuilder();
            for (int previousPort : previousStagePorts) {
                portsToString.append(previousPort + " ");
            }
            logger.debug("Creating sink consumer connecting to publishers on ports " + portsToString);
        }
        Observable<Set<Endpoint>> endpoints = LocalJobExecutorNetworked.staticEndpoints(previousStagePorts, stageNumber, workerIndex, 1);
        WorkerConsumerRemoteObservable sinkConsumer = new WorkerConsumerRemoteObservable(null, LocalJobExecutorNetworked.staticInjector(endpoints));
        StageExecutors.executeSink(sinkConsumer, stage, sink, portSelector, new RxMetrics(), context, sinkObservableTerminatedCompletedCallback, null, null, sinkObservableCompletedCallback, sinkObservableErrorCallback);
    }

    public static Map<String, Object> checkAndGetParameters(Map<String, ParameterDefinition<?>> parameterDefinitions, Parameter ... parameters) throws IllegalArgumentException {
        HashMap<String, Parameter> indexedParameters = new HashMap<String, Parameter>();
        for (Parameter parameter : parameters) {
            indexedParameters.put(parameter.getName(), parameter);
        }
        return ParameterUtils.checkThenCreateState(parameterDefinitions, indexedParameters);
    }

    public static void execute(Job job, Parameter ... parameters) throws IllegalMantisJobException {
        List<StageConfig<?, ?>> stages = job.getStages();
        SchedulingInfo.Builder builder = new SchedulingInfo.Builder();
        for (StageConfig<?, ?> stage : stages) {
            builder.singleWorkerStage(MachineDefinitions.micro());
        }
        builder.numberOfStages(stages.size());
        LocalJobExecutorNetworked.execute(job, builder.build(), parameters);
    }

    public static void execute(Job job, SchedulingInfo schedulingInfo, Parameter ... parameters) throws IllegalMantisJobException {
        int i;
        try {
            new ValidateJob(job).execute();
        }
        catch (CommandException e2) {
            throw new IllegalMantisJobException(e2);
        }
        List<StageConfig<?, ?>> stages = job.getStages();
        SourceHolder<?> source2 = job.getSource();
        SinkHolder sink = job.getSink();
        PortSelectorInRange portSelector = new PortSelectorInRange(8000, 9000);
        RxNetty.useMetricListenersFactory(new MantisNettyEventsListenerFactory());
        MetricsServer metricsServer = new MetricsServer(portSelector.acquirePort(), 1L, Collections.EMPTY_MAP);
        metricsServer.start();
        Lifecycle lifecycle = job.getLifecycle();
        lifecycle.startup();
        Map<String, ParameterDefinition<?>> parameterDefinitions = job.getParameterDefinitions();
        String user = Optional.ofNullable(MantisProperties.getProperty("USER")).orElse("userUnknown");
        String jobId = String.format("localJob-%s-%d", user, (int)(Math.random() * 10000.0));
        logger.info("jobID {}", (Object)jobId);
        ServiceLocator serviceLocator = lifecycle.getServiceLocator();
        int numInstances = schedulingInfo.forStage(1).getNumberOfInstances();
        BehaviorSubject<Integer> workersInStageOneObservable = BehaviorSubject.create(numInstances);
        BehaviorSubject<WorkerMap> workerMapObservable = BehaviorSubject.create();
        if (stages.size() == 1) {
            StageConfig<?, ?> stage = stages.get(0);
            final CountDownLatch waitUntilAllCompleted = new CountDownLatch(numInstances);
            Action0 countDownLatchOnComplete = new Action0(){

                @Override
                public void call() {
                    waitUntilAllCompleted.countDown();
                }
            };
            Action0 nullOnCompleted = new Action0(){

                @Override
                public void call() {
                }
            };
            Action1<Throwable> nullOnError = new Action1<Throwable>(){

                @Override
                public void call(Throwable t) {
                }
            };
            HashMap<Integer, List<WorkerInfo>> workerInfoMap = new HashMap<Integer, List<WorkerInfo>>();
            ArrayList<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
            for (int i2 = 0; i2 < numInstances; ++i2) {
                WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
                WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, 1, i2, i2 + 1, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                workerInfoList.add(workerInfo);
                Context context = new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameters), lifecycle.getServiceLocator(), workerInfo, MetricsRegistry.getInstance(), () -> System.exit(0), workerMapObservable, Thread.currentThread().getContextClassLoader());
                workerInfoMap.put(1, workerInfoList);
                workerMapObservable.onNext(new WorkerMap(workerInfoMap));
                StageExecutors.executeSingleStageJob(source2, stage, sink, () -> workerInfo.getWorkerPorts().getSinkPort(), new RxMetrics(), context, countDownLatchOnComplete, i2, workersInStageOneObservable, null, null, nullOnCompleted, nullOnError);
            }
            try {
                waitUntilAllCompleted.await();
            }
            catch (InterruptedException e3) {
                throw new RuntimeException(e3);
            }
        }
        int workerNumber = 0;
        StageConfig<?, ?> currentStage = stages.get(0);
        StageConfig<?, ?> previousStage = null;
        StageSchedulingInfo currentStageScalingInfo = schedulingInfo.forStage(1);
        StageSchedulingInfo nextStageScalingInfo = schedulingInfo.forStage(2);
        int[] previousPorts = new int[currentStageScalingInfo.getNumberOfInstances()];
        HashMap<Integer, List<WorkerInfo>> workerInfoMap = new HashMap<Integer, List<WorkerInfo>>();
        ArrayList<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
        for (i = 0; i < currentStageScalingInfo.getNumberOfInstances(); ++i) {
            int sourcePort;
            WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
            WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, 1, i, i + 1, MantisJobDurationType.Perpetual, "localhost", workerPorts);
            workerInfoList.add(workerInfo);
            previousPorts[i] = sourcePort = workerInfo.getWorkerPorts().getSinkPort();
            Context context = new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameters), serviceLocator, workerInfo, MetricsRegistry.getInstance(), nullAction, workerMapObservable, Thread.currentThread().getContextClassLoader());
            LocalJobExecutorNetworked.startSource(i, sourcePort, nextStageScalingInfo.getNumberOfInstances(), job.getSource(), currentStage, context, workersInStageOneObservable);
        }
        workerInfoMap.put(1, workerInfoList);
        workerMapObservable.onNext(new WorkerMap(workerInfoMap));
        for (i = 1; i < stages.size() - 1; ++i) {
            previousStage = currentStage;
            StageSchedulingInfo previousStageScalingInfo = schedulingInfo.forStage(i);
            currentStageScalingInfo = schedulingInfo.forStage(i + 1);
            currentStage = stages.get(i);
            nextStageScalingInfo = schedulingInfo.forStage(i + 2);
            int[] currentPorts = new int[currentStageScalingInfo.getNumberOfInstances()];
            workerInfoList = new ArrayList();
            for (int j = 0; j < currentStageScalingInfo.getNumberOfInstances(); ++j) {
                int port;
                WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
                WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, i + 1, j, workerNumber++, MantisJobDurationType.Perpetual, "localhost", workerPorts);
                workerInfoList.add(workerInfo);
                currentPorts[j] = port = workerInfo.getWorkerPorts().getSinkPort();
                Context context = new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameters), serviceLocator, workerInfo, MetricsRegistry.getInstance(), nullAction, workerMapObservable, Thread.currentThread().getContextClassLoader());
                LocalJobExecutorNetworked.startIntermediate(previousPorts, port, currentStage, context, j, nextStageScalingInfo.getNumberOfInstances(), i, previousStageScalingInfo.getNumberOfInstances());
            }
            workerInfoMap.put(i + 1, workerInfoList);
            workerMapObservable.onNext(new WorkerMap(workerInfoMap));
            previousPorts = currentPorts;
        }
        StageSchedulingInfo previousStageScalingInfo = schedulingInfo.forStage(stages.size() - 1);
        previousStage = stages.get(stages.size() - 2);
        currentStage = stages.get(stages.size() - 1);
        currentStageScalingInfo = schedulingInfo.forStage(stages.size());
        numInstances = currentStageScalingInfo.getNumberOfInstances();
        final CountDownLatch waitUntilAllCompleted = new CountDownLatch(numInstances);
        Action0 countDownLatchOnTerminated = new Action0(){

            @Override
            public void call() {
                waitUntilAllCompleted.countDown();
            }
        };
        Action0 nullOnCompleted = new Action0(){

            @Override
            public void call() {
            }
        };
        Action1<Throwable> nullOnError = new Action1<Throwable>(){

            @Override
            public void call(Throwable t) {
            }
        };
        workerInfoList = new ArrayList();
        for (int i3 = 0; i3 < numInstances; ++i3) {
            WorkerPorts workerPorts = new WorkerPorts(portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort(), portSelector.acquirePort());
            WorkerInfo workerInfo = new WorkerInfo(jobId, jobId, stages.size(), i3, workerNumber++, MantisJobDurationType.Perpetual, "localhost", workerPorts);
            workerInfoList.add(workerInfo);
            Context context = new Context(ParameterUtils.createContextParameters(parameterDefinitions, parameters), serviceLocator, workerInfo, MetricsRegistry.getInstance(), nullAction, workerMapObservable, Thread.currentThread().getContextClassLoader());
            LocalJobExecutorNetworked.startSink(previousStage, previousPorts, currentStage, () -> workerInfo.getWorkerPorts().getSinkPort(), sink, context, countDownLatchOnTerminated, nullOnCompleted, nullOnError, stages.size(), i3, previousStageScalingInfo.getNumberOfInstances());
        }
        workerInfoMap.put(stages.size(), workerInfoList);
        workerMapObservable.onNext(new WorkerMap(workerInfoMap));
        try {
            waitUntilAllCompleted.await();
        }
        catch (InterruptedException e4) {
            throw new RuntimeException(e4);
        }
        lifecycle.shutdown();
        metricsServer.shutdown();
    }

    private static Observable<Set<Endpoint>> staticEndpoints(final int[] ports, final int stageNum, final int workerIndex, final int numPartitions) {
        return Observable.create(new Observable.OnSubscribe<Set<Endpoint>>(){

            @Override
            public void call(Subscriber<? super Set<Endpoint>> subscriber2) {
                HashSet<Endpoint> endpoints = new HashSet<Endpoint>();
                for (int i = 0; i < ports.length; ++i) {
                    int port = ports[i];
                    for (int j = 1; j <= numPartitions; ++j) {
                        Endpoint endpoint = new Endpoint("localhost", port, "stage_" + stageNum + "_index_" + workerIndex + "_partition_" + j);
                        logger.info("adding static endpoint:" + endpoint);
                        endpoints.add(endpoint);
                    }
                }
                subscriber2.onNext(endpoints);
                subscriber2.onCompleted();
            }
        });
    }

    private static EndpointInjector staticInjector(final Observable<Set<Endpoint>> endpointsToAdd) {
        return new EndpointInjector(){

            @Override
            public Observable<EndpointChange> deltas() {
                return endpointsToAdd.flatMap(new Func1<Set<Endpoint>, Observable<EndpointChange>>(){

                    @Override
                    public Observable<EndpointChange> call(Set<Endpoint> t1) {
                        return Observable.from(t1).map(new Func1<Endpoint, EndpointChange>(){

                            @Override
                            public EndpointChange call(Endpoint t1) {
                                logger.info("injected endpoint:" + t1);
                                return new EndpointChange(EndpointChange.Type.add, new Endpoint(t1.getHost(), t1.getPort(), t1.getSlotId()));
                            }
                        });
                    }
                });
            }
        };
    }
}

