/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.rx.MonitorOperator;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.Groups;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.SourceHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.computation.Computation;
import io.mantisrx.runtime.executor.PortSelector;
import io.mantisrx.runtime.executor.SinkPublisher;
import io.mantisrx.runtime.executor.WorkerConsumer;
import io.mantisrx.runtime.executor.WorkerPublisher;
import io.mantisrx.runtime.markers.MantisMarker;
import io.mantisrx.runtime.scheduler.SingleThreadScheduler;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivx.mantis.operators.GroupedObservableUtils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func2;
import rx.internal.util.RxThreadFactory;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class StageExecutors {
    private static final Logger logger = LoggerFactory.getLogger(StageExecutors.class);
    private static Counter groupsExpiredCounter;
    private static long stageBufferIntervalMs;
    private static int maxItemsInBuffer;

    private StageExecutors() {
    }

    public static void executeSingleStageJob(final SourceHolder source, final StageConfig stage, SinkHolder sink, PortSelector portSelector, RxMetrics rxMetrics, final Context context, Action0 sinkObservableTerminatedCallback, final int workerIndex, final Observable<Integer> totalWorkerAtStageObservable, Action0 onSinkSubscribe, Action0 onSinkUnsubscribe, Action0 observableOnCompleteCallback, Action1<Throwable> observableOnErrorCallback) {
        WorkerConsumer sourceConsumer = new WorkerConsumer(){

            public Observable start(StageConfig previousStage) {
                Index index = new Index(workerIndex, (Observable<Integer>)totalWorkerAtStageObservable);
                source.getSourceFunction().init(context, index);
                Observable sourceObservable = (Observable)source.getSourceFunction().call(context, index);
                if (stage.getInputStrategy() == StageConfig.INPUT_STRATEGY.CONCURRENT) {
                    return sourceObservable;
                }
                return Observable.just((Object)Observable.merge((Observable)sourceObservable));
            }

            @Override
            public void stop() {
            }
        };
        SinkPublisher sinkPublisher = new SinkPublisher(sink, portSelector, context, sinkObservableTerminatedCallback, onSinkSubscribe, onSinkUnsubscribe, observableOnCompleteCallback, observableOnErrorCallback);
        StageExecutors.executeIntermediate(sourceConsumer, stage, sinkPublisher, context);
    }

    public static void executeSource(final int workerIndex, final SourceHolder source, StageConfig stage, WorkerPublisher publisher, final Context context, final Observable<Integer> totalWorkerAtStageObservable) {
        WorkerConsumer sourceConsumer = new WorkerConsumer(){

            public Observable start(StageConfig stage) {
                Index index = new Index(workerIndex, (Observable<Integer>)totalWorkerAtStageObservable);
                source.getSourceFunction().init(context, index);
                Observable sourceObservable = (Observable)source.getSourceFunction().call(context, new Index(workerIndex, (Observable<Integer>)totalWorkerAtStageObservable));
                return MantisMarker.sourceOut(sourceObservable);
            }

            @Override
            public void stop() {
            }
        };
        StageExecutors.executeIntermediate(sourceConsumer, stage, publisher, context);
    }

    private static <K, T, R> Observable<Observable<R>> executeGroupsInParallel(Observable<GroupedObservable<K, T>> go, Computation computation, Context context, long groupTakeUntil) {
        logger.info("initializing {}", (Object)computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 c = (Func2)computation;
        return go.lift((Observable.Operator)new MonitorOperator("worker_stage_outer")).map(group -> ((Observable)c.call((Object)context, (Object)GroupedObservableUtils.createGroupedObservable((Object)group.getKey(), (Observable)group.doOnUnsubscribe(() -> {
            if (groupsExpiredCounter != null) {
                groupsExpiredCounter.increment();
            }
        }).timeout(groupTakeUntil, TimeUnit.SECONDS, Observable.empty()).subscribeOn(Schedulers.computation()).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_input"))))).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_output")));
    }

    private static <K, T, R> Observable<Observable<R>> executeMantisGroups(Observable<Observable<MantisGroup<K, T>>> go, Computation computation, Context context, long groupTakeUntil) {
        logger.info("initializing {}", (Object)computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 c = (Func2)computation;
        return go.lift((Observable.Operator)new MonitorOperator("worker_stage_outer")).map(group -> ((Observable)c.call((Object)context, (Object)group.lift((Observable.Operator)new MonitorOperator("worker_stage_inner_input")))).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_output")));
    }

    private static <K, T, R> Observable<Observable<R>> executeMantisGroupsInParallel(Observable<Observable<MantisGroup<K, T>>> go, Computation computation, Context context, boolean applyTimeoutToInners, long timeout) {
        logger.info("initializing {}", (Object)computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 c = (Func2)computation;
        return go.lift((Observable.Operator)new MonitorOperator("worker_stage_outer")).map(observable -> ((Observable)c.call((Object)context, (Object)observable.subscribeOn(Schedulers.computation()).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_input")))).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_output")));
    }

    private static <T, R> Observable<Observable<R>> executeInners(Observable<Observable<T>> oo, Computation computation, Context context, boolean applyTimeoutToInners, long timeout) {
        logger.info("initializing {}", (Object)computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 c = (Func2)computation;
        return oo.lift((Observable.Operator)new MonitorOperator("worker_stage_outer")).map(observable -> ((Observable)c.call((Object)context, (Object)observable.lift((Observable.Operator)new MonitorOperator("worker_stage_inner_input")))).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_output")));
    }

    private static <T, R> Observable<Observable<R>> executeInnersInParallel(Observable<Observable<T>> oo, Computation computation, Context context, boolean applyTimeoutToInners, long timeout, int concurrency) {
        logger.info("initializing {}", (Object)computation.getClass().getCanonicalName());
        computation.init(context);
        Func2 c = (Func2)computation;
        if (concurrency == -1) {
            return oo.lift((Observable.Operator)new MonitorOperator("worker_stage_outer")).map(observable -> ((Observable)c.call((Object)context, (Object)observable.observeOn(Schedulers.computation()).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_input")))).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_output")));
        }
        SingleThreadScheduler[] singleThreadSchedulers = new SingleThreadScheduler[concurrency];
        RxThreadFactory rxThreadFactory = new RxThreadFactory("MantisSingleThreadScheduler-");
        logger.info("creating {} Mantis threads", (Object)concurrency);
        for (int i = 0; i < concurrency; ++i) {
            singleThreadSchedulers[i] = new SingleThreadScheduler((ThreadFactory)rxThreadFactory);
        }
        return oo.lift((Observable.Operator)new MonitorOperator("worker_stage_outer")).map(observable -> observable.groupBy(e -> System.nanoTime() % (long)concurrency).flatMap(go -> ((Observable)c.call((Object)context, (Object)go.observeOn((Scheduler)singleThreadSchedulers[((Long)go.getKey()).intValue()]).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_input")))).lift((Observable.Operator)new MonitorOperator("worker_stage_inner_output"))));
    }

    private static int resolveStageConcurrency(int givenStageConcurrency) {
        if (givenStageConcurrency == -1) {
            String jobParamPrefix = "JOB_PARAM_";
            String stageConcurrencyParam = jobParamPrefix + "mantis.stageConcurrency";
            String concurrency = System.getenv(stageConcurrencyParam);
            logger.info("Job param: " + stageConcurrencyParam + " value: " + concurrency);
            if (concurrency != null && !concurrency.isEmpty()) {
                logger.info("Job param: " + stageConcurrencyParam + " value: " + concurrency);
                try {
                    int jobParamConcurrency = Integer.parseInt(concurrency);
                    if (jobParamConcurrency <= 0) {
                        return givenStageConcurrency;
                    }
                    return jobParamConcurrency;
                }
                catch (NumberFormatException numberFormatException) {
                    // empty catch block
                }
            }
        }
        return givenStageConcurrency;
    }

    private static <T, R> Observable<Observable<R>> setupScalarToScalarStage(ScalarToScalar<T, R> stage, Observable<Observable<T>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up ScalarToScalar stage with input type: " + (Object)((Object)inputType));
        if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            return StageExecutors.executeInnersInParallel(source, stage.getComputation(), context, false, Integer.MAX_VALUE, StageExecutors.resolveStageConcurrency(stage.getConcurrency()));
        }
        if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {
            Observable merged = Observable.just((Object)Observable.merge(source));
            return StageExecutors.executeInners(merged, stage.getComputation(), context, false, Integer.MAX_VALUE);
        }
        throw new RuntimeException("Unsupported input type: " + inputType.name());
    }

    private static <K, T, R> Observable<Observable<GroupedObservable<String, R>>> setupScalarToKeyStage(ScalarToKey<K, T, R> stage, Observable<Observable<T>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up ScalarToKey stage with input type: " + (Object)((Object)inputType));
        if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            return StageExecutors.executeInnersInParallel(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds(), StageExecutors.resolveStageConcurrency(stage.getConcurrency()));
        }
        if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {
            Observable merged = Observable.just((Object)Observable.merge(source));
            return StageExecutors.executeInners(merged, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputType.name());
    }

    private static <K, T, R> Observable<Observable<MantisGroup<String, R>>> setupScalarToGroupStage(ScalarToGroup<K, T, R> stage, Observable<Observable<T>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up ScalarToGroup stage with input type: " + (Object)((Object)inputType));
        if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            return StageExecutors.executeInners(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds());
        }
        if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {
            Observable merged = Observable.just((Object)Observable.merge(source));
            return StageExecutors.executeInners(merged, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputType.name());
    }

    private static <K1, T, K2, R> Observable<Observable<GroupedObservable<String, R>>> setupKeyToKeyStage(KeyToKey<K1, T, K2, R> stage, Observable<Observable<GroupedObservable<String, T>>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up KeyToKey stage with input type: " + (Object)((Object)inputType));
        if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            throw new RuntimeException("Concurrency is not a supported input strategy for KeyComputation");
        }
        if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {
            Observable<GroupedObservable<String, T>> shuffled = Groups.flatten(source);
            return StageExecutors.executeGroupsInParallel(shuffled, stage.getComputation(), context, stage.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputType.name());
    }

    private static <K1, T, K2, R> Observable<Observable<MantisGroup<String, R>>> setupGroupToGroupStage(GroupToGroup<K1, T, K2, R> stage, Observable<Observable<MantisGroup<String, T>>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up GroupToGroup stage with input type: " + (Object)((Object)inputType));
        if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            throw new RuntimeException("Concurrency is not a supported input strategy for KeyComputation");
        }
        if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {
            Observable merged = Observable.just((Object)Observable.merge(source));
            return StageExecutors.executeMantisGroups(merged, stage.getComputation(), context, stage.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputType.name());
    }

    private static <K, T, R> Observable<Observable<R>> setupKeyToScalarStage(KeyToScalar<K, T, R> stage, Observable<Observable<MantisGroup<String, T>>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up KeyToScalar stage with input type: " + (Object)((Object)inputType));
        Observable<GroupedObservable<String, T>> shuffled = Groups.flattenMantisGroupsToGroupedObservables(source);
        return StageExecutors.executeGroupsInParallel(shuffled, stage.getComputation(), context, stage.getKeyExpireTimeSeconds());
    }

    private static <K, T, R> Observable<Observable<R>> setupGroupToScalarStage(GroupToScalar<K, T, R> stage, Observable<Observable<MantisGroup<K, T>>> source, Context context) {
        StageConfig.INPUT_STRATEGY inputType = stage.getInputStrategy();
        logger.info("Setting up GroupToScalar stage with input type: " + (Object)((Object)inputType));
        if (inputType == StageConfig.INPUT_STRATEGY.CONCURRENT) {
            logger.info("Execute Groups in PARALLEL!!!!");
            return StageExecutors.executeMantisGroupsInParallel(source, stage.getComputation(), context, true, stage.getKeyExpireTimeSeconds());
        }
        if (inputType == StageConfig.INPUT_STRATEGY.SERIAL) {
            Observable merged = Observable.just((Object)Observable.merge(source));
            return StageExecutors.executeMantisGroups(merged, stage.getComputation(), context, stage.getKeyExpireTimeSeconds());
        }
        throw new RuntimeException("Unsupported input type: " + inputType.name());
    }

    public static <T, R> void executeIntermediate(WorkerConsumer consumer, StageConfig<T, R> stage, WorkerPublisher publisher, Context context) {
        if (consumer == null) {
            throw new IllegalArgumentException("consumer cannot be null");
        }
        if (stage == null) {
            throw new IllegalArgumentException("stage cannot be null");
        }
        if (publisher == null) {
            throw new IllegalArgumentException("producer cannot be null");
        }
        Object toSink = null;
        if (stage instanceof ScalarToScalar) {
            ScalarToScalar scalarStage = (ScalarToScalar)stage;
            Observable source = consumer.start(scalarStage);
            toSink = StageExecutors.setupScalarToScalarStage(scalarStage, source, context);
        } else if (stage instanceof ScalarToKey) {
            ScalarToKey scalarStage = (ScalarToKey)stage;
            Observable source = consumer.start(scalarStage);
            toSink = StageExecutors.setupScalarToKeyStage(scalarStage, source, context);
        } else if (stage instanceof ScalarToGroup) {
            ScalarToGroup scalarStage = (ScalarToGroup)stage;
            Observable source = consumer.start(scalarStage);
            toSink = StageExecutors.setupScalarToGroupStage(scalarStage, source, context);
        } else if (stage instanceof KeyToKey) {
            KeyToKey keyToKey = (KeyToKey)stage;
            Observable source = consumer.start(keyToKey);
            toSink = StageExecutors.setupKeyToKeyStage(keyToKey, source, context);
        } else if (stage instanceof GroupToGroup) {
            GroupToGroup groupToGroup = (GroupToGroup)stage;
            Observable source = consumer.start(groupToGroup);
            toSink = StageExecutors.setupGroupToGroupStage(groupToGroup, source, context);
        } else if (stage instanceof KeyToScalar) {
            KeyToScalar scalarToKey = (KeyToScalar)stage;
            Observable source = consumer.start(scalarToKey);
            toSink = StageExecutors.setupKeyToScalarStage(scalarToKey, source, context);
        } else if (stage instanceof GroupToScalar) {
            GroupToScalar groupToScalar = (GroupToScalar)stage;
            Observable source = consumer.start(groupToScalar);
            toSink = StageExecutors.setupGroupToScalarStage(groupToScalar, source, context);
        }
        publisher.start(stage, toSink);
    }

    public static void executeSink(WorkerConsumer consumer, StageConfig stage, SinkHolder sink, PortSelector portSelector, RxMetrics rxMetrics, Context context, Action0 sinkObservableCompletedCallback, Action0 onSinkSubscribe, Action0 onSinkUnsubscribe, Action0 observableOnCompleteCallback, Action1<Throwable> observableOnErrorCallback) {
        SinkPublisher sinkPublisher = new SinkPublisher(sink, portSelector, context, sinkObservableCompletedCallback, onSinkSubscribe, onSinkUnsubscribe, observableOnCompleteCallback, observableOnErrorCallback);
        StageExecutors.executeIntermediate(consumer, stage, sinkPublisher, context);
    }

    static {
        stageBufferIntervalMs = 100L;
        maxItemsInBuffer = 100;
        Metrics m = new Metrics.Builder().name("StageExecutors").addCounter("groupsExpiredCounter").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        groupsExpiredCounter = m.getCounter("groupsExpiredCounter");
        String stageBufferIntervalMillisStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.stage.buffer.intervalMs", "100");
        stageBufferIntervalMs = Integer.parseInt(stageBufferIntervalMillisStr);
        String stageBufferMaxStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.stage.buffer.maxSize", "100");
        maxItemsInBuffer = Integer.parseInt(stageBufferMaxStr);
    }
}

