/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.master.client;

import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.common.network.WorkerEndpoint;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.NamedJobInfo;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.core.zookeeper.CuratorService;
import io.mantisrx.server.master.client.ConditionalRetry;
import io.mantisrx.server.master.client.MantisMasterClientApi;
import io.mantisrx.server.master.client.config.ConfigurationFactory;
import io.mantisrx.server.master.client.config.StaticPropertiesConfigurationFactory;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;

public class MasterClientWrapper {
    public static final String InvalidNamedJob = "No_such_named_job";
    private static final Logger logger = LoggerFactory.getLogger(MasterClientWrapper.class);
    final CountDownLatch latch = new CountDownLatch(1);
    final BehaviorSubject<Boolean> initialMaster = BehaviorSubject.create();
    private final MasterMonitor masterMonitor;
    private final Counter masterConnectRetryCounter;
    ConfigurationFactory configurationFactory;
    private MantisMasterClientApi masterClientApi;
    private PublishSubject<JobSinkNumWorkers> numSinkWorkersSubject = PublishSubject.create();
    private PublishSubject<JobNumWorkers> numWorkersSubject = PublishSubject.create();

    public MasterClientWrapper(Properties properties) {
        this(new StaticPropertiesConfigurationFactory(properties));
    }

    public MasterClientWrapper(ConfigurationFactory configurationFactory) {
        this.configurationFactory = configurationFactory;
        this.masterMonitor = this.initializeMasterMonitor();
        Metrics m = new Metrics.Builder().name(MasterClientWrapper.class.getCanonicalName()).addCounter("MasterConnectRetryCount").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.masterConnectRetryCounter = m.getCounter("MasterConnectRetryCount");
    }

    public static String getWrappedHost(String host, int workerNumber) {
        return host + "-" + workerNumber;
    }

    public static String getUnwrappedHost(String wrappedHost) {
        int i = wrappedHost.lastIndexOf(45);
        if (i < 0) {
            return wrappedHost;
        }
        return wrappedHost.substring(0, i);
    }

    public static void main(String[] args) throws InterruptedException {
        Properties zkProps = new Properties();
        zkProps.put("mantis.zookeeper.connectString", "ec2-50-19-255-1.compute-1.amazonaws.com:2181,ec2-54-235-159-245.compute-1.amazonaws.com:2181,ec2-50-19-255-97.compute-1.amazonaws.com:2181,ec2-184-73-152-248.compute-1.amazonaws.com:2181,ec2-50-17-247-179.compute-1.amazonaws.com:2181");
        zkProps.put("mantis.zookeeper.leader.announcement.path", "/leader");
        zkProps.put("mantis.zookeeper.root", "/mantis/master");
        final String jobId = "GroupByIPNJ-12";
        final MasterClientWrapper clientWrapper = new MasterClientWrapper(zkProps);
        clientWrapper.getMasterClientApi().flatMap((Func1)new Func1<MantisMasterClientApi, Observable<EndpointChange>>(){

            public Observable<EndpointChange> call(MantisMasterClientApi mantisMasterClientApi) {
                Object sinkStage = null;
                return mantisMasterClientApi.getSinkStageNum(jobId).take(1).flatMap((Func1)new Func1<Integer, Observable<EndpointChange>>(){

                    public Observable<EndpointChange> call(Integer integer) {
                        logger.info("Getting sink locations for " + jobId);
                        return clientWrapper.getSinkLocations(jobId, integer, 0, 0);
                    }
                });
            }
        }).toBlocking().subscribe(ep -> System.out.println("Endpoint Change -> " + ep));
        Thread.sleep(50000L);
    }

    public MasterMonitor getMasterMonitor() {
        return this.masterMonitor;
    }

    public void addNumSinkWorkersObserver(Observer<JobSinkNumWorkers> numSinkWorkersObserver) {
        this.numSinkWorkersSubject.subscribe(numSinkWorkersObserver);
    }

    public void addNumWorkersObserver(Observer<JobNumWorkers> numWorkersObserver) {
        this.numWorkersSubject.subscribe(numWorkersObserver);
    }

    public Observable<MantisMasterClientApi> getMasterClientApi() {
        return this.initialMaster.onErrorResumeNext(throwable -> {
            logger.warn("Error getting initial master from zookeeper: " + throwable.getMessage());
            return Observable.empty();
        }).take(1).map(aBoolean -> this.masterClientApi);
    }

    private void startInitialMasterDescriptionGetter(CuratorService curatorService, MasterMonitor masterMonitor) {
        AtomicBoolean initialMasterGotten = new AtomicBoolean(false);
        masterMonitor.getMasterObservable().takeWhile(masterDescription -> !initialMasterGotten.get()).subscribe(masterDescription -> {
            if (masterDescription == null) {
                return;
            }
            logger.info("Initialized master description=" + masterDescription);
            initialMasterGotten.set(true);
            this.masterClientApi = new MantisMasterClientApi(masterMonitor);
            this.initialMaster.onNext((Object)true);
        });
        curatorService.start();
    }

    private MasterMonitor initializeMasterMonitor() {
        CoreConfiguration config = this.configurationFactory.getConfig();
        CuratorService curatorService = new CuratorService(config, null);
        MasterMonitor masterMonitor = curatorService.getMasterMonitor();
        this.startInitialMasterDescriptionGetter(curatorService, masterMonitor);
        return masterMonitor;
    }

    private List<Endpoint> getAllNonJobMasterEndpoints(final String jobId, Map<Integer, WorkerAssignments> workerAssignments) {
        ArrayList<Endpoint> endpoints = new ArrayList<Endpoint>();
        int totalWorkers = 0;
        for (Map.Entry<Integer, WorkerAssignments> workerAssignment : workerAssignments.entrySet()) {
            Integer stageNum = workerAssignment.getKey();
            if (stageNum == 0) continue;
            WorkerAssignments assignments = workerAssignment.getValue();
            logger.info("job {} Creating endpoints conx from {} worker assignments for stage {}", new Object[]{jobId, assignments.getHosts().size(), stageNum});
            logger.info("stage {} hosts: {}", (Object)stageNum, (Object)assignments.getHosts());
            totalWorkers += assignments.getNumWorkers();
            for (WorkerHost host : assignments.getHosts().values()) {
                final int workerIndex = host.getWorkerIndex();
                if (host.getState() != MantisJobState.Started) continue;
                logger.info("job " + jobId + ": creating new endpoint for worker number=" + host.getWorkerNumber() + ", index=" + host.getWorkerIndex() + ", host:port=" + host.getHost() + ":" + host.getPort().get(0));
                WorkerEndpoint ep = new WorkerEndpoint(MasterClientWrapper.getWrappedHost(host.getHost(), host.getWorkerNumber()), ((Integer)host.getPort().get(0)).intValue(), stageNum.intValue(), host.getMetricsPort(), host.getWorkerIndex(), host.getWorkerNumber(), new Action0(){

                    public void call() {
                        logger.info("job " + jobId + " WorkerIndex " + workerIndex + " completed");
                    }
                }, (Action1)new Action1<Throwable>(){

                    public void call(Throwable t1) {
                        logger.info("job " + jobId + " WorkerIndex " + workerIndex + " failed");
                    }
                });
                endpoints.add((Endpoint)ep);
            }
        }
        this.numWorkersSubject.onNext((Object)new JobNumWorkers(jobId, totalWorkers));
        return endpoints;
    }

    public Observable<EndpointChange> getAllWorkerMetricLocations(final String jobId) {
        final ConditionalRetry schedInfoRetry = new ConditionalRetry(this.masterConnectRetryCounter, "AllSchedInfoRetry", 10);
        Observable schedulingUpdates = this.getMasterClientApi().take(1).flatMap((Func1)new Func1<MantisMasterClientApi, Observable<? extends List<Endpoint>>>(){

            public Observable<? extends List<Endpoint>> call(MantisMasterClientApi mantisMasterClientApi) {
                return mantisMasterClientApi.schedulingChanges(jobId).doOnError((Action1)new Action1<Throwable>(){

                    public void call(Throwable throwable) {
                        logger.warn("Error on scheduling changes observable: " + throwable);
                    }
                }).retryWhen(schedInfoRetry.getRetryLogic()).map((Func1)new Func1<JobSchedulingInfo, Map<Integer, WorkerAssignments>>(){

                    public Map<Integer, WorkerAssignments> call(JobSchedulingInfo jobSchedulingInfo) {
                        logger.info("Got scheduling info for " + jobId);
                        return jobSchedulingInfo.getWorkerAssignments();
                    }
                }).filter((Func1)new Func1<Map<Integer, WorkerAssignments>, Boolean>(){

                    public Boolean call(Map<Integer, WorkerAssignments> workerAssignments) {
                        return workerAssignments != null;
                    }
                }).map((Func1)new Func1<Map<Integer, WorkerAssignments>, List<Endpoint>>(){

                    public List<Endpoint> call(Map<Integer, WorkerAssignments> workerAssignments) {
                        return MasterClientWrapper.this.getAllNonJobMasterEndpoints(jobId, workerAssignments);
                    }
                }).doOnError((Action1)new Action1<Throwable>(){

                    public void call(Throwable throwable) {
                        logger.error(throwable.getMessage(), throwable);
                    }
                });
            }
        });
        return new ToDeltaEndpointInjector(schedulingUpdates).deltas();
    }

    public Observable<EndpointChange> getSinkLocations(String jobId, int sinkStage, int forPartition, int totalPartitions) {
        ConditionalRetry schedInfoRetry = new ConditionalRetry(this.masterConnectRetryCounter, "SchedInfoRetry", 10);
        Observable schedulingUpdates = this.getMasterClientApi().take(1).flatMap(mantisMasterClientApi -> mantisMasterClientApi.schedulingChanges(jobId).doOnError(throwable -> logger.warn(throwable.getMessage())).retryWhen(schedInfoRetry.getRetryLogic()).map(jobSchedulingInfo -> {
            logger.info("Got scheduling info for " + jobId);
            logger.info("Worker Assignments " + jobSchedulingInfo.getWorkerAssignments().get(sinkStage));
            return (WorkerAssignments)jobSchedulingInfo.getWorkerAssignments().get(sinkStage);
        }).map(workerAssignments -> {
            ArrayList<Endpoint> endpoints = new ArrayList<Endpoint>();
            if (workerAssignments != null) {
                logger.info("job " + jobId + " Creating endpoints conx from " + workerAssignments.getHosts().size() + " worker assignments");
                for (WorkerHost host : workerAssignments.getHosts().values()) {
                    int workerIndex = host.getWorkerIndex();
                    int totalFromPartitions = workerAssignments.getNumWorkers();
                    this.numSinkWorkersSubject.onNext((Object)new JobSinkNumWorkers(jobId, totalFromPartitions));
                    if (!this.usePartition(workerIndex, totalFromPartitions, forPartition, totalPartitions) || host.getState() != MantisJobState.Started) continue;
                    Endpoint ep = new Endpoint(MasterClientWrapper.getWrappedHost(host.getHost(), host.getWorkerNumber()), ((Integer)host.getPort().get(0)).intValue(), () -> logger.info("job " + jobId + " WorkerIndex " + workerIndex + " completed"), t1 -> logger.info("job " + jobId + " WorkerIndex " + workerIndex + " failed"));
                    endpoints.add(ep);
                }
            } else {
                logger.info("job " + jobId + " Has no active workers!");
            }
            return endpoints;
        }).doOnError(throwable -> logger.error(throwable.getMessage(), throwable)));
        return new ToDeltaEndpointInjector(schedulingUpdates).deltas();
    }

    private boolean usePartition(int fromPartition, int fromTotalPartitions, int toPartition, int toTotalPartitions) {
        if (toPartition < 0 || toTotalPartitions == 0) {
            return true;
        }
        long n = Math.round((double)fromTotalPartitions / (double)toTotalPartitions);
        long beg = (long)toPartition * n;
        long end = toPartition == toTotalPartitions - 1 ? (long)fromTotalPartitions : (long)(toPartition + 1) * n;
        return beg < (long)fromTotalPartitions && (long)fromPartition >= beg && (long)fromPartition < end;
    }

    public Observable<Boolean> namedJobExists(String jobName) {
        ConditionalRetry namedJobRetry = new ConditionalRetry(this.masterConnectRetryCounter, "NamedJobExists", Integer.MAX_VALUE);
        return this.getMasterClientApi().flatMap(mantisMasterClientApi -> {
            logger.info("verifying if job name exists: " + jobName);
            return mantisMasterClientApi.namedJobExists(jobName);
        }).retryWhen(namedJobRetry.getRetryLogic());
    }

    public Observable<String> getNamedJobsIds(String jobName) {
        ConditionalRetry namedJobsIdsRetry = new ConditionalRetry(this.masterConnectRetryCounter, "NamedJobsIds", Integer.MAX_VALUE);
        return this.getMasterClientApi().flatMap(mantisMasterClientApi -> {
            logger.info("verifying if job name exists: " + jobName);
            return mantisMasterClientApi.namedJobExists(jobName).map(aBoolean -> aBoolean != false ? mantisMasterClientApi : null);
        }).onErrorResumeNext(throwable -> {
            logger.error(throwable.getMessage());
            return Observable.empty();
        }).take(1).map(mantisMasterClientApi -> {
            if (mantisMasterClientApi == null) {
                Exception exception = new Exception("No such Job Cluster " + jobName);
                namedJobsIdsRetry.setErrorRef(exception);
                return Observable.just((Object)new NamedJobInfo(jobName, InvalidNamedJob));
            }
            logger.info("Getting Job cluster info for " + jobName);
            return mantisMasterClientApi.namedJobInfo(jobName);
        }).doOnError(throwable -> logger.error(throwable.getMessage(), throwable)).retryWhen(namedJobsIdsRetry.getRetryLogic()).flatMap(namedJobInfo -> namedJobInfo.map(nji -> nji.getJobId()));
    }

    public static class JobNumWorkers {
        protected final int numWorkers;
        private final String jobId;

        public JobNumWorkers(String jobId, int numWorkers) {
            this.jobId = jobId;
            this.numWorkers = numWorkers;
        }

        public String getJobId() {
            return this.jobId;
        }

        public int getNumWorkers() {
            return this.numWorkers;
        }
    }

    public static class JobSinkNumWorkers {
        protected final int numSinkWorkers;
        private final String jobId;

        public JobSinkNumWorkers(String jobId, int numSinkWorkers) {
            this.jobId = jobId;
            this.numSinkWorkers = numSinkWorkers;
        }

        public String getJobId() {
            return this.jobId;
        }

        public int getNumSinkWorkers() {
            return this.numSinkWorkers;
        }
    }
}

