/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.worker.scheduling;

import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.worker.scheduling.WorkerIndexChange;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

public class JobSchedulingTracker {
    private static final Logger logger = LoggerFactory.getLogger(JobSchedulingTracker.class);
    private Observable<JobSchedulingInfo> schedulingChangesForJobId;

    public JobSchedulingTracker(Observable<JobSchedulingInfo> schedulingChangesForJobId) {
        this.schedulingChangesForJobId = schedulingChangesForJobId;
    }

    public Observable<WorkerIndexChange> startedWorkersPerIndex(int stageNumber) {
        Observable<WorkerIndexChange> workerIndexChanges = this.workerIndexChanges(stageNumber);
        return workerIndexChanges.filter((Func1)new Func1<WorkerIndexChange, Boolean>(){

            public Boolean call(WorkerIndexChange newWorkerChange) {
                return newWorkerChange.getNewState().getState() == MantisJobState.Started;
            }
        });
    }

    public Observable<WorkerIndexChange> workerIndexChanges(int stageNumber) {
        return this.workerChangesForStage(stageNumber, this.schedulingChangesForJobId).flatMap((Func1)new Func1<WorkerAssignments, Observable<WorkerHost>>(){

            public Observable<WorkerHost> call(WorkerAssignments assignments) {
                logger.info("Received scheduling update from master: " + assignments);
                return Observable.from(assignments.getHosts().values());
            }
        }).groupBy((Func1)new Func1<WorkerHost, Integer>(){

            public Integer call(WorkerHost workerHost) {
                return workerHost.getWorkerIndex();
            }
        }).flatMap((Func1)new Func1<GroupedObservable<Integer, WorkerHost>, Observable<WorkerIndexChange>>(){

            public Observable<WorkerIndexChange> call(final GroupedObservable<Integer, WorkerHost> workerIndexGroup) {
                return workerIndexGroup.startWith((Object)new WorkerHost(null, -1, null, null, -1, -1, -1)).buffer(2, 1).filter((Func1)new Func1<List<WorkerHost>, Boolean>(){

                    public Boolean call(List<WorkerHost> currentAndPrevious) {
                        if (currentAndPrevious.size() < 2) {
                            return false;
                        }
                        WorkerHost previous = currentAndPrevious.get(0);
                        WorkerHost current = currentAndPrevious.get(1);
                        return previous.getWorkerNumber() != current.getWorkerNumber();
                    }
                }).map((Func1)new Func1<List<WorkerHost>, WorkerIndexChange>(){

                    public WorkerIndexChange call(List<WorkerHost> list) {
                        return new WorkerIndexChange((Integer)workerIndexGroup.getKey(), list.get(1), list.get(0));
                    }
                });
            }
        });
    }

    private Observable<WorkerAssignments> workerChangesForStage(final int stageNumber, Observable<JobSchedulingInfo> schedulingUpdates) {
        return schedulingUpdates.flatMap((Func1)new Func1<JobSchedulingInfo, Observable<WorkerAssignments>>(){

            public Observable<WorkerAssignments> call(JobSchedulingInfo schedulingChange) {
                Map assignments = schedulingChange.getWorkerAssignments();
                if (assignments != null && !assignments.isEmpty()) {
                    return Observable.from(assignments.values());
                }
                return Observable.empty();
            }
        }).filter((Func1)new Func1<WorkerAssignments, Boolean>(){

            public Boolean call(WorkerAssignments assignments) {
                return assignments.getStage() == stageNumber;
            }
        });
    }
}

