/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker;

import com.mantisrx.common.utils.Closeables;
import com.netflix.spectator.api.Registry;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.SourceHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.WorkerInfo;
import io.mantisrx.runtime.WorkerMap;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.runtime.executor.PortSelector;
import io.mantisrx.runtime.executor.StageExecutors;
import io.mantisrx.runtime.executor.WorkerConsumer;
import io.mantisrx.runtime.executor.WorkerConsumerRemoteObservable;
import io.mantisrx.runtime.executor.WorkerPublisher;
import io.mantisrx.runtime.executor.WorkerPublisherRemoteObservable;
import io.mantisrx.runtime.lifecycle.Lifecycle;
import io.mantisrx.runtime.lifecycle.ServiceLocator;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.runtime.parameter.ParameterUtils;
import io.mantisrx.runtime.parameter.Parameters;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.StatusPayloads;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.worker.DataDroppedPayloadSetter;
import io.mantisrx.server.worker.ExecutionDetails;
import io.mantisrx.server.worker.Heartbeat;
import io.mantisrx.server.worker.ResourceUsagePayloadSetter;
import io.mantisrx.server.worker.RunningWorker;
import io.mantisrx.server.worker.WorkerExecutionOperations;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import io.mantisrx.server.worker.jobmaster.AutoScaleMetricsConfig;
import io.mantisrx.server.worker.jobmaster.JobMasterService;
import io.mantisrx.server.worker.jobmaster.JobMasterStageConfig;
import io.mantisrx.shaded.com.google.common.base.Splitter;
import io.mantisrx.shaded.com.google.common.base.Strings;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class WorkerExecutionOperationsNetworkStage
implements WorkerExecutionOperations {
    private static final Logger logger = LoggerFactory.getLogger(WorkerExecutionOperationsNetworkStage.class);
    private final WorkerConfiguration config;
    private final WorkerMetricsClient workerMetricsClient;
    private final AtomicReference<Heartbeat> heartbeatRef = new AtomicReference();
    private final SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory;
    private final MantisMasterGateway mantisMasterApi;
    private int connectionsPerEndpoint = 2;
    private boolean lookupSpectatorRegistry = true;
    private SinkSubscriptionStateHandler subscriptionStateHandler;
    private Action0 onSinkSubscribe = null;
    private Action0 onSinkUnsubscribe = null;
    private final List<Closeable> closeables = new ArrayList<Closeable>();
    private final ScheduledExecutorService scheduledExecutorService;
    private final ClassLoader classLoader;
    private Observer<Status> jobStatusObserver;

    public WorkerExecutionOperationsNetworkStage(MantisMasterGateway mantisMasterApi, WorkerConfiguration config, WorkerMetricsClient workerMetricsClient, SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory, ClassLoader classLoader) {
        this.mantisMasterApi = mantisMasterApi;
        this.config = config;
        this.workerMetricsClient = workerMetricsClient;
        this.sinkSubscriptionStateHandlerFactory = sinkSubscriptionStateHandlerFactory;
        this.classLoader = classLoader;
        String connectionsPerEndpointStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.connectionsPerEndpoint", "2");
        if (connectionsPerEndpointStr != null && !connectionsPerEndpointStr.equals("2")) {
            this.connectionsPerEndpoint = Integer.parseInt(connectionsPerEndpointStr);
        }
        String locateSpectatorRegistry = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.worker.locate.spectator.registry", "true");
        this.lookupSpectatorRegistry = Boolean.valueOf(locateSpectatorRegistry);
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    }

    static WorkerMap convertJobSchedulingInfoToWorkerMap(String jobName, String jobId, MantisJobDurationType durationType, JobSchedulingInfo js) {
        HashMap stageToWorkerInfoMap = new HashMap();
        WorkerMap workerMap = new WorkerMap(stageToWorkerInfoMap);
        if (jobName == null || jobName.isEmpty() || jobId == null || jobId.isEmpty()) {
            logger.warn("Job name/jobId cannot be null in convertJobSchedulingInfoToWorkerMap");
            return workerMap;
        }
        if (js == null || js.getWorkerAssignments() == null) {
            logger.warn("JobSchedulingInfo or workerAssignments cannot be null in convertJobSchedulingInfoToWorkerMap");
            return workerMap;
        }
        try {
            Map workerAssignments = js.getWorkerAssignments();
            for (Map.Entry next : workerAssignments.entrySet()) {
                int stageNo = (Integer)next.getKey();
                WorkerAssignments workerAssignmentsForStage = (WorkerAssignments)next.getValue();
                Map hosts = workerAssignmentsForStage.getHosts();
                if (hosts == null) continue;
                List workerInfoList = hosts.values().stream().map(workerHost -> WorkerExecutionOperationsNetworkStage.generateWorkerInfo(jobName, jobId, stageNo, workerHost.getWorkerIndex(), workerHost.getWorkerNumber(), durationType, workerHost.getHost(), workerHost)).collect(Collectors.toList());
                stageToWorkerInfoMap.put(stageNo, workerInfoList);
            }
            workerMap = new WorkerMap(stageToWorkerInfoMap);
        }
        catch (Exception e) {
            logger.warn("Exception converting JobSchedulingInfo " + js + " to worker Map " + e.getMessage());
            return workerMap;
        }
        return workerMap;
    }

    private static WorkerInfo generateWorkerInfo(String jobName, String jobId, int stageNumber, int workerIndex, int workerNumber, MantisJobDurationType durationType, String host, WorkerHost workerHost) {
        int sinkPort = Optional.ofNullable(workerHost.getPort()).map(ports -> ports.size() >= 1 ? (Integer)ports.get(0) : Integer.valueOf(-1)).orElse(-1);
        WorkerPorts wPorts = new WorkerPorts(workerHost.getMetricsPort(), 65534, 65535, workerHost.getCustomPort(), sinkPort);
        return WorkerExecutionOperationsNetworkStage.generateWorkerInfo(jobName, jobId, stageNumber, workerIndex, workerNumber, durationType, host, wPorts);
    }

    private static WorkerInfo generateWorkerInfo(String jobName, String jobId, int stageNumber, int workerIndex, int workerNumber, MantisJobDurationType durationType, String host, WorkerPorts workerPorts) {
        return new WorkerInfo(jobName, jobId, stageNumber, workerIndex, workerNumber, durationType, host, workerPorts);
    }

    private static Context generateContext(Parameters parameters, ServiceLocator serviceLocator, WorkerInfo workerInfo, MetricsRegistry metricsRegistry, Action0 completeAndExitAction, Observable<WorkerMap> workerMapObservable, ClassLoader classLoader) {
        return new Context(parameters, serviceLocator, workerInfo, metricsRegistry, completeAndExitAction, workerMapObservable, classLoader);
    }

    private Closeable startSendingHeartbeats(Observer<Status> jobStatusObserver, double networkMbps, long heartbeatIntervalSecs) {
        this.heartbeatRef.get().setPayload(String.valueOf(StatusPayloads.Type.SubscriptionState), "false");
        ScheduledFuture<?> heartbeatFuture = this.scheduledExecutorService.scheduleWithFixedDelay(() -> jobStatusObserver.onNext((Object)this.heartbeatRef.get().getCurrentHeartbeatStatus()), heartbeatIntervalSecs, heartbeatIntervalSecs, TimeUnit.SECONDS);
        DataDroppedPayloadSetter droppedPayloadSetter = new DataDroppedPayloadSetter(this.heartbeatRef.get());
        droppedPayloadSetter.start(heartbeatIntervalSecs);
        ResourceUsagePayloadSetter usagePayloadSetter = new ResourceUsagePayloadSetter(this.heartbeatRef.get(), this.config, networkMbps);
        usagePayloadSetter.start(heartbeatIntervalSecs);
        return Closeables.combine((Closeable[])new Closeable[]{() -> heartbeatFuture.cancel(false), droppedPayloadSetter, usagePayloadSetter});
    }

    private Observable<WorkerMap> createWorkerMapObservable(Observable<JobSchedulingInfo> selfSchedulingInfo, String jobName, String jobId, MantisJobDurationType durationType) {
        return selfSchedulingInfo.filter(jobSchedulingInfo -> jobSchedulingInfo != null && jobSchedulingInfo.getWorkerAssignments() != null && !jobSchedulingInfo.getWorkerAssignments().isEmpty()).map(jssi -> WorkerExecutionOperationsNetworkStage.convertJobSchedulingInfoToWorkerMap(jobName, jobId, durationType, jssi));
    }

    private Observable<Integer> createSourceStageTotalWorkersObservable(Observable<JobSchedulingInfo> selfSchedulingInfo) {
        return selfSchedulingInfo.filter(jobSchedulingInfo -> jobSchedulingInfo != null && jobSchedulingInfo.getWorkerAssignments() != null && !jobSchedulingInfo.getWorkerAssignments().isEmpty()).map(schedulingInfo -> {
            Map workerAssignmentsMap = schedulingInfo.getWorkerAssignments();
            boolean stageNum = true;
            WorkerAssignments workerAssignments = (WorkerAssignments)workerAssignmentsMap.get(1);
            return workerAssignments.getNumWorkers();
        });
    }

    private void signalStarted(RunningWorker rw) {
        rw.signalStarted();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void executeStage(ExecutionDetails setup) throws IOException {
        ExecuteStageRequest executionRequest = setup.getExecuteStageRequest().getRequest();
        this.jobStatusObserver = setup.getStatus();
        Observable selfSchedulingInfo = this.mantisMasterApi.schedulingChanges(executionRequest.getJobId()).subscribeOn(Schedulers.io()).replay(1).refCount().doOnSubscribe(() -> logger.info("mantisApi schedulingChanges subscribe")).doOnUnsubscribe(() -> logger.info("mantisApi schedulingChanges stream unsub.")).doOnError(e -> logger.warn("mantisApi schedulingChanges stream error:", e)).doOnCompleted(() -> logger.info("mantisApi schedulingChanges stream completed."));
        WorkerInfo workerInfo = WorkerExecutionOperationsNetworkStage.generateWorkerInfo(executionRequest.getJobName(), executionRequest.getJobId(), executionRequest.getStage(), executionRequest.getWorkerIndex(), executionRequest.getWorkerNumber(), executionRequest.getDurationType(), "host", executionRequest.getWorkerPorts());
        Observable<Integer> sourceStageTotalWorkersObs = this.createSourceStageTotalWorkersObservable((Observable<JobSchedulingInfo>)selfSchedulingInfo);
        RunningWorker.Builder rwBuilder = new RunningWorker.Builder().job(setup.getMantisJob()).schedulingInfo(executionRequest.getSchedulingInfo()).stageTotalWorkersObservable(sourceStageTotalWorkersObs).jobName(executionRequest.getJobName()).stageNum(executionRequest.getStage()).workerIndex(executionRequest.getWorkerIndex()).workerNum(executionRequest.getWorkerNumber()).totalStages(executionRequest.getTotalNumStages()).metricsPort(executionRequest.getMetricsPort()).ports(executionRequest.getPorts().iterator()).jobStatusObserver(setup.getStatus()).requestSubject((PublishSubject<Boolean>)setup.getExecuteStageRequest().getRequestSubject()).workerInfo(workerInfo).hasJobMaster(executionRequest.getHasJobMaster()).jobId(executionRequest.getJobId());
        rwBuilder = executionRequest.getStage() == 0 ? rwBuilder.stage(new JobMasterStageConfig("jobmasterconfig")) : rwBuilder.stage((StageConfig)setup.getMantisJob().getStages().get(executionRequest.getStage() - 1));
        final RunningWorker rw = rwBuilder.build();
        if (rw.getStageNum() == rw.getTotalStagesNet()) {
            this.setupSubscriptionStateHandler(setup.getExecuteStageRequest().getRequest());
        }
        logger.info("Running worker info: " + rw);
        rw.signalStartedInitiated();
        try {
            logger.info(">>>>>>>>>>>>>>>>Calling lifecycle.startup()");
            Lifecycle lifecycle = rw.getJob().getLifecycle();
            lifecycle.startup();
            ServiceLocator serviceLocator = lifecycle.getServiceLocator();
            if (this.lookupSpectatorRegistry) {
                try {
                    Registry spectatorRegistry = (Registry)serviceLocator.service(Registry.class);
                    SpectatorRegistryFactory.setRegistry((Registry)spectatorRegistry);
                }
                catch (Throwable t) {
                    logger.error("failed to init spectator registry using service locator, falling back to {}", (Object)SpectatorRegistryFactory.getRegistry().getClass().getCanonicalName());
                }
            }
            Parameters parameters = ParameterUtils.createContextParameters((Map)rw.getJob().getParameterDefinitions(), setup.getParameters());
            Context context = WorkerExecutionOperationsNetworkStage.generateContext(parameters, serviceLocator, workerInfo, MetricsRegistry.getInstance(), () -> {
                rw.signalCompleted();
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException ie) {
                    logger.warn("Unexpected exception sleeping: " + ie.getMessage());
                }
                System.exit(0);
            }, this.createWorkerMapObservable((Observable<JobSchedulingInfo>)selfSchedulingInfo, executionRequest.getJobName(), executionRequest.getJobId(), executionRequest.getDurationType()), this.classLoader);
            rw.setContext(context);
            this.heartbeatRef.set(new Heartbeat(rw.getJobId(), rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), this.config.getTaskExecutorHostName()));
            double networkMbps = executionRequest.getSchedulingInfo().forStage(rw.getStageNum()).getMachineDefinition().getNetworkMbps();
            Closeable heartbeatCloseable = this.startSendingHeartbeats(rw.getJobStatus(), networkMbps, executionRequest.getHeartbeatIntervalSecs());
            this.closeables.add(heartbeatCloseable);
            if (rw.getStageNum() == 0) {
                logger.info("JobId: " + rw.getJobId() + ", executing Job Master");
                AutoScaleMetricsConfig autoScaleMetricsConfig = new AutoScaleMetricsConfig();
                String autoScaleMetricString = (String)parameters.get("mantis.jobmaster.autoscale.metric", (Object)"");
                if (!Strings.isNullOrEmpty((String)autoScaleMetricString)) {
                    List tokens = Splitter.on((String)"::").omitEmptyStrings().trimResults().splitToList((CharSequence)autoScaleMetricString);
                    if (tokens.size() != 3) {
                        String errorMsg = String.format("ERROR: Invalid value %s for param %s", autoScaleMetricString, "mantis.jobmaster.autoscale.metric");
                        logger.error(errorMsg);
                        throw new RuntimeException(errorMsg);
                    }
                    String metricGroup = (String)tokens.get(0);
                    String metricName = (String)tokens.get(1);
                    String algo = (String)tokens.get(2);
                    try {
                        AutoScaleMetricsConfig.AggregationAlgo aggregationAlgo = AutoScaleMetricsConfig.AggregationAlgo.valueOf(algo);
                        logger.info("registered UserDefined auto scale metric {}:{} algo {}", new Object[]{metricGroup, metricName, aggregationAlgo});
                        autoScaleMetricsConfig.addUserDefinedMetric(metricGroup, metricName, aggregationAlgo);
                    }
                    catch (IllegalArgumentException e2) {
                        String errorMsg = String.format("ERROR: Invalid algorithm value %s for param %s (algo should be one of %s)", autoScaleMetricsConfig, "mantis.jobmaster.autoscale.metric", Arrays.stream(AutoScaleMetricsConfig.AggregationAlgo.values()).map(a -> a.name()).collect(Collectors.toList()));
                        logger.error(errorMsg);
                        throw new RuntimeException(errorMsg);
                    }
                } else {
                    logger.info("param {} is null or empty", (Object)"mantis.jobmaster.autoscale.metric");
                }
                JobMasterService jobMasterService = new JobMasterService(rw.getJobId(), rw.getSchedulingInfo(), this.workerMetricsClient, autoScaleMetricsConfig, this.mantisMasterApi, rw.getContext(), rw.getOnCompleteCallback(), rw.getOnErrorCallback(), rw.getOnTerminateCallback());
                jobMasterService.start();
                this.closeables.add(jobMasterService::shutdown);
                this.signalStarted(rw);
                rw.waitUntilTerminate();
            } else if (rw.getStageNum() == 1 && rw.getTotalStagesNet() == 1) {
                logger.info("JobId: " + rw.getJobId() + ", single stage job, executing entire job");
                PortSelector portSelector = new PortSelector(){

                    public int acquirePort() {
                        return rw.getPorts().next();
                    }
                };
                RxMetrics rxMetrics = new RxMetrics();
                this.closeables.add(StageExecutors.executeSingleStageJob((SourceHolder)rw.getJob().getSource(), (StageConfig)rw.getStage(), (SinkHolder)rw.getJob().getSink(), (PortSelector)portSelector, (RxMetrics)rxMetrics, (Context)rw.getContext(), (Action0)rw.getOnTerminateCallback(), (int)rw.getWorkerIndex(), rw.getSourceStageTotalWorkersObservable(), (Action0)this.onSinkSubscribe, (Action0)this.onSinkUnsubscribe, (Action0)rw.getOnCompleteCallback(), rw.getOnErrorCallback()));
                this.signalStarted(rw);
                rw.waitUntilTerminate();
            } else {
                logger.info("JobId: " + rw.getJobId() + ", executing a multi-stage job, stage: " + rw.getStageNum());
                if (rw.getStageNum() == 1) {
                    String remoteObservableName = rw.getJobId() + "_" + rw.getStageNum();
                    StageSchedulingInfo currentStageSchedulingInfo = rw.getSchedulingInfo().forStage(1);
                    WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable(rw.getPorts().next().intValue(), remoteObservableName, this.numWorkersAtStage((Observable<JobSchedulingInfo>)selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName());
                    this.closeables.add(StageExecutors.executeSource((int)rw.getWorkerIndex(), (SourceHolder)rw.getJob().getSource(), (StageConfig)rw.getStage(), (WorkerPublisher)publisher, (Context)rw.getContext(), rw.getSourceStageTotalWorkersObservable()));
                    logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", serving remote observable for source with name: " + remoteObservableName);
                    RemoteRxServer server = publisher.getServer();
                    RxMetrics rxMetrics = server.getMetrics();
                    MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
                    this.signalStarted(rw);
                    logger.info("JobId: " + rw.getJobId() + " stage: " + rw.getStageNum() + ", blocking until source observable completes");
                    server.blockUntilServerShutdown();
                } else {
                    this.executeNonSourceStage((Observable<JobSchedulingInfo>)selfSchedulingInfo, rw);
                }
            }
            logger.info("Calling lifecycle.shutdown()");
            lifecycle.shutdown();
            return;
        }
        catch (Throwable t) {
            logger.warn("Error during executing stage; shutting down.", t);
            rw.signalFailed(t);
            this.shutdownStage();
        }
    }

    private void setupSubscriptionStateHandler(ExecuteStageRequest executeStageRequest) {
        SinkSubscriptionStateHandler subscriptionStateHandler = (SinkSubscriptionStateHandler)this.sinkSubscriptionStateHandlerFactory.apply((Object)executeStageRequest);
        this.onSinkSubscribe = () -> {
            this.heartbeatRef.get().setPayload(StatusPayloads.Type.SubscriptionState.toString(), Boolean.toString(true));
            subscriptionStateHandler.onSinkSubscribed();
        };
        this.onSinkUnsubscribe = () -> {
            this.heartbeatRef.get().setPayload(StatusPayloads.Type.SubscriptionState.toString(), Boolean.toString(false));
            subscriptionStateHandler.onSinkUnsubscribed();
        };
        this.subscriptionStateHandler = subscriptionStateHandler;
        try {
            this.subscriptionStateHandler.startAsync().awaitRunning(Duration.of(5L, ChronoUnit.SECONDS));
        }
        catch (TimeoutException e) {
            logger.error("Failed to start subscriptionStateHandler: ", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void executeNonSourceStage(Observable<JobSchedulingInfo> selfSchedulingInfo, RunningWorker rw) {
        StageConfig previousStageExecuting = (StageConfig)rw.getJob().getStages().get(rw.getStageNum() - 2);
        StageSchedulingInfo previousSchedulingInfo = rw.getSchedulingInfo().forStage(rw.getStageNum() - 1);
        int numInstanceAtPreviousStage = previousSchedulingInfo.getNumberOfInstances();
        AtomicBoolean acceptSchedulingChanges = new AtomicBoolean(true);
        WorkerConsumer consumer = this.connectToObservableAtPreviousStages(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() - 1, numInstanceAtPreviousStage, previousStageExecuting, acceptSchedulingChanges, rw.getJobStatus(), rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum());
        final int workerPort = rw.getPorts().next();
        if (rw.getStageNum() == rw.getTotalStagesNet()) {
            logger.info("JobId: {}, executing sink stage: {}, signaling started", (Object)rw.getJobId(), (Object)rw.getStageNum());
            rw.getJobStatus().onNext((Object)new Status(rw.getJobId(), rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), Status.TYPE.INFO, String.format("stage %s worker index=%s number=%s %s", rw.getStageNum(), rw.getWorkerIndex(), rw.getWorkerNum(), "running"), MantisJobState.Started));
            PortSelector portSelector = new PortSelector(){

                public int acquirePort() {
                    return workerPort;
                }
            };
            RxMetrics rxMetrics = new RxMetrics();
            MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
            final CountDownLatch blockUntilComplete = new CountDownLatch(1);
            Action0 countDownLatch = new Action0(){

                public void call() {
                    blockUntilComplete.countDown();
                }
            };
            this.closeables.add(StageExecutors.executeSink((WorkerConsumer)consumer, (StageConfig)rw.getStage(), (SinkHolder)rw.getJob().getSink(), (PortSelector)portSelector, (RxMetrics)rxMetrics, (Context)rw.getContext(), (Action0)countDownLatch, (Action0)this.onSinkSubscribe, (Action0)this.onSinkUnsubscribe, (Action0)rw.getOnCompleteCallback(), rw.getOnErrorCallback()));
            try {
                blockUntilComplete.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            acceptSchedulingChanges.set(false);
        } else {
            logger.info("JobId: " + rw.getJobId() + ", executing intermediate stage: " + rw.getStageNum());
            int stageNumToExecute = rw.getStageNum();
            String jobId = rw.getJobId();
            String remoteObservableName = jobId + "_" + stageNumToExecute;
            WorkerPublisherRemoteObservable publisher = new WorkerPublisherRemoteObservable(workerPort, remoteObservableName, this.numWorkersAtStage(selfSchedulingInfo, rw.getJobId(), rw.getStageNum() + 1), rw.getJobName());
            this.closeables.add(StageExecutors.executeIntermediate((WorkerConsumer)consumer, (StageConfig)rw.getStage(), (WorkerPublisher)publisher, (Context)rw.getContext()));
            RemoteRxServer server = publisher.getServer();
            logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", serving intermediate remote observable with name: " + remoteObservableName);
            RxMetrics rxMetrics = server.getMetrics();
            MetricsRegistry.getInstance().registerAndGet(rxMetrics.getCountersAndGauges());
            this.signalStarted(rw);
            logger.info("JobId: " + jobId + " stage: " + stageNumToExecute + ", blocking until intermediate observable completes");
            server.blockUntilServerShutdown();
            acceptSchedulingChanges.set(false);
        }
    }

    private Observable<Integer> numWorkersAtStage(Observable<JobSchedulingInfo> selfSchedulingInfo, String jobId, int stageNum) {
        return selfSchedulingInfo.distinctUntilChanged((prevJobSchedInfo, currentJobSchedInfo) -> prevJobSchedInfo.equals(currentJobSchedInfo)).flatMap(schedulingChange -> {
            Map assignments = schedulingChange.getWorkerAssignments();
            if (assignments != null && !assignments.isEmpty()) {
                return Observable.from(assignments.values());
            }
            return Observable.empty();
        }).filter(assignments -> assignments.getStage() == stageNum).map(assignments -> assignments.getNumWorkers() * this.connectionsPerEndpoint).share();
    }

    private WorkerConsumer connectToObservableAtPreviousStages(Observable<JobSchedulingInfo> selfSchedulingInfo, String jobId, int previousStageNum, int numInstanceAtPreviousStage, StageConfig previousStage, AtomicBoolean acceptSchedulingChanges, Observer<Status> jobStatusObserver, int stageNumToExecute, int workerIndex, int workerNumber) {
        logger.info("Watching for scheduling changes");
        Observable schedulingUpdates = selfSchedulingInfo.flatMap(schedulingChange -> {
            Map assignments = schedulingChange.getWorkerAssignments();
            if (assignments != null && !assignments.isEmpty()) {
                return Observable.from(assignments.values());
            }
            return Observable.empty();
        }).filter(assignments -> assignments.getStage() == previousStageNum && acceptSchedulingChanges.get()).map(assignments -> {
            LinkedList<Endpoint> endpoints = new LinkedList<Endpoint>();
            for (WorkerHost host : assignments.getHosts().values()) {
                if (host.getState() != MantisJobState.Started) continue;
                logger.info("Received scheduling update from master, connect request for host: " + host.getHost() + " port: " + host.getPort() + " state: " + host.getState() + " adding: " + this.connectionsPerEndpoint + " connections to host");
                for (int i = 1; i <= this.connectionsPerEndpoint; ++i) {
                    String endpointId = "stage_" + stageNumToExecute + "_index_" + Integer.toString(workerIndex) + "_partition_" + i;
                    logger.info("Adding endpoint to endpoint injector to be considered for add, with id: " + endpointId);
                    endpoints.add(new Endpoint(host.getHost(), ((Integer)host.getPort().get(0)).intValue(), endpointId));
                }
            }
            return endpoints;
        }).filter(t1 -> t1.size() > 0);
        String name = jobId + "_" + previousStageNum;
        return new WorkerConsumerRemoteObservable(name, (EndpointInjector)new ToDeltaEndpointInjector(schedulingUpdates));
    }

    @Override
    public void shutdownStage() throws IOException {
        if (this.jobStatusObserver != null) {
            Heartbeat heartbeat = this.heartbeatRef.get();
            Status status = new Status(heartbeat.getJobId(), heartbeat.getStageNumber(), heartbeat.getWorkerIndex(), heartbeat.getWorkerNumber(), Status.TYPE.INFO, String.format("stage %s worker index=%s number=%s %s", heartbeat.getStageNumber(), heartbeat.getWorkerIndex(), heartbeat.getWorkerNumber(), "shutdown"), MantisJobState.Failed);
            this.jobStatusObserver.onNext((Object)status);
        }
        if (this.subscriptionStateHandler != null) {
            try {
                this.subscriptionStateHandler.stopAsync();
            }
            catch (Exception e) {
                logger.error("Failed to stop subscription state handler successfully", (Throwable)e);
            }
            finally {
                this.subscriptionStateHandler = null;
            }
        }
        Closeables.combine(this.closeables).close();
        this.scheduledExecutorService.shutdownNow();
        logger.info("Shutdown completed");
    }
}

