/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.client;

import io.mantisrx.client.JobSinkLocator;
import io.mantisrx.client.SinkClient;
import io.mantisrx.client.SinkClientImpl;
import io.mantisrx.client.SinkConnectionFunc;
import io.mantisrx.client.SinkConnectionsStatus;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.server.core.Configurations;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.JobSubmitResponse;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.MasterClientWrapper;
import io.reactivex.mantis.remote.observable.EndpointChange;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.subjects.PublishSubject;

public class MantisClient {
    private static final Logger logger = LoggerFactory.getLogger(MantisClient.class);
    private static final String ENABLE_PINGS_KEY = "mantis.sse.disablePingFiltering";
    private final boolean disablePingFiltering;
    private final MasterClientWrapper clientWrapper;
    private final JobSinkLocator jobSinkLocator = new JobSinkLocator(){

        @Override
        public Observable<EndpointChange> locateSinkForJob(String jobId) {
            return this.locatePartitionedSinkForJob(jobId, -1, 0);
        }

        @Override
        public Observable<EndpointChange> locatePartitionedSinkForJob(String jobId, int forPartition, int totalPartitions) {
            return MantisClient.this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.getSinkStageNum(jobId).take(1).flatMap(integer -> {
                logger.info("Getting sink locations for " + jobId);
                return MantisClient.this.clientWrapper.getSinkLocations(jobId, integer.intValue(), forPartition, totalPartitions);
            }));
        }
    };

    public MantisClient(Properties properties) {
        HighAvailabilityServices haServices = HighAvailabilityServicesUtil.createHAServices((CoreConfiguration)((CoreConfiguration)Configurations.frmProperties((Properties)properties, CoreConfiguration.class)));
        this.clientWrapper = new MasterClientWrapper(haServices.getMasterClientApi());
        this.disablePingFiltering = Boolean.parseBoolean(properties.getProperty(ENABLE_PINGS_KEY));
    }

    public MantisClient(MasterClientWrapper clientWrapper, boolean disablePingFiltering) {
        this.disablePingFiltering = disablePingFiltering;
        this.clientWrapper = clientWrapper;
    }

    public MantisClient(MasterClientWrapper clientWrapper) {
        this(clientWrapper, false);
    }

    public JobSinkLocator getSinkLocator() {
        return this.jobSinkLocator;
    }

    MasterClientWrapper getClientWrapper() {
        return this.clientWrapper;
    }

    private MantisMasterGateway blockAndGetMasterApi() {
        return (MantisMasterGateway)this.clientWrapper.getMasterClientApi().toBlocking().first();
    }

    public Observable<Boolean> namedJobExists(String jobName) {
        return this.clientWrapper.namedJobExists(jobName);
    }

    public <T> Observable<SinkClient<T>> getSinkClientByJobName(String jobName, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver) {
        return this.getSinkClientByJobName(jobName, sinkConnectionFunc, sinkConnectionsStatusObserver, 5L);
    }

    public <T> Observable<SinkClient<T>> getSinkClientByJobName(String jobName, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver, long dataRecvTimeoutSecs) {
        AtomicReference lastJobIdRef = new AtomicReference();
        return this.clientWrapper.getNamedJobsIds(jobName).doOnUnsubscribe(() -> lastJobIdRef.set(null)).filter(newJobId -> {
            logger.info("Got job cluster's new jobId=" + newJobId);
            return this.newJobIdIsGreater((String)lastJobIdRef.get(), (String)newJobId);
        }).map(jobId -> {
            if ("No_such_named_job".equals(jobId)) {
                return this.getErrorSinkClient((String)jobId);
            }
            lastJobIdRef.set(jobId);
            logger.info("Connecting to job " + jobName + " with new jobId=" + jobId);
            return this.getSinkClientByJobId((String)jobId, sinkConnectionFunc, sinkConnectionsStatusObserver, dataRecvTimeoutSecs);
        });
    }

    private Boolean newJobIdIsGreater(String oldJobId, String newJobId) {
        if (oldJobId == null) {
            return true;
        }
        int oldIdx = oldJobId.lastIndexOf(45);
        if (oldIdx < 0) {
            return true;
        }
        int newIdx = newJobId.lastIndexOf(45);
        if (newIdx < 0) {
            return true;
        }
        try {
            int old = Integer.parseInt(oldJobId.substring(oldIdx + 1));
            int newJ = Integer.parseInt(newJobId.substring(newIdx + 1));
            return newJ > old;
        }
        catch (IndexOutOfBoundsException | NumberFormatException e) {
            return true;
        }
    }

    private <T> SinkClient<T> getErrorSinkClient(final String mesg) {
        return new SinkClient<T>(){

            @Override
            public boolean hasError() {
                return true;
            }

            @Override
            public String getError() {
                return mesg;
            }

            @Override
            public Observable<Observable<T>> getResults() {
                return null;
            }

            @Override
            public Observable<Observable<T>> getPartitionedResults(int forPartition, int totalPartitions) {
                return null;
            }
        };
    }

    public <T> SinkClient<T> getSinkClientByJobId(String jobId, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver) {
        return this.getSinkClientByJobId(jobId, sinkConnectionFunc, sinkConnectionsStatusObserver, 5L);
    }

    public <T> SinkClient<T> getSinkClientByJobId(String jobId, SinkConnectionFunc<T> sinkConnectionFunc, Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver, long dataRecvTimeoutSecs) {
        PublishSubject numSinkWrkrsSubject = PublishSubject.create();
        this.clientWrapper.addNumSinkWorkersObserver((Observer)numSinkWrkrsSubject);
        return new SinkClientImpl<T>(jobId, sinkConnectionFunc, this.getSinkLocator(), (Observable<Integer>)numSinkWrkrsSubject.filter(jobSinkNumWorkers -> jobId.equals(jobSinkNumWorkers.getJobId())).map(jobSinkNumWorkers -> jobSinkNumWorkers.getNumSinkWorkers()), sinkConnectionsStatusObserver, dataRecvTimeoutSecs, this.disablePingFiltering);
    }

    public String submitJob(String name, String version, List<Parameter> parameters, JobSla jobSla, SchedulingInfo schedulingInfo) throws Exception {
        return ((JobSubmitResponse)this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.submitJob(name, version, parameters, jobSla, schedulingInfo).onErrorResumeNext(t -> {
            logger.warn(t.getMessage());
            return Observable.empty();
        })).take(1).toBlocking().first()).getJobId();
    }

    public String submitJob(String name, String version, List<Parameter> parameters, JobSla jobSla, long subscriptionTimeoutSecs, SchedulingInfo schedulingInfo) throws Exception {
        return ((JobSubmitResponse)this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.submitJob(name, version, parameters, jobSla, subscriptionTimeoutSecs, schedulingInfo).onErrorResumeNext(t -> {
            logger.warn(t.getMessage());
            return Observable.empty();
        })).take(1).toBlocking().first()).getJobId();
    }

    public String submitJob(String name, String version, List<Parameter> parameters, JobSla jobSla, long subscriptionTimeoutSecs, SchedulingInfo schedulingInfo, boolean readyForJobMaster) throws Exception {
        return ((JobSubmitResponse)this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.submitJob(name, version, parameters, jobSla, subscriptionTimeoutSecs, schedulingInfo, readyForJobMaster).onErrorResumeNext(t -> {
            logger.warn(t.getMessage());
            return Observable.empty();
        })).take(1).toBlocking().first()).getJobId();
    }

    public void killJob(String jobId) {
        this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.killJob(jobId).onErrorResumeNext(t -> {
            logger.warn(t.getMessage());
            return Observable.empty();
        })).take(1).toBlocking().first();
    }

    public Observable<String> getJobsOfNamedJob(String name, MantisJobState.MetaState state) {
        return this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.getJobsOfNamedJob(name, state)).first();
    }

    public Observable<String> getJobStatusObservable(String jobId) {
        return this.clientWrapper.getMasterClientApi().flatMap(mantisMasterClientApi -> mantisMasterClientApi.getJobStatusObservable(jobId));
    }

    public Observable<JobSchedulingInfo> getSchedulingChanges(String jobId) {
        return this.clientWrapper.getMasterClientApi().flatMap(masterClientApi -> masterClientApi.schedulingChanges(jobId));
    }

    public Observable<JobSchedulingInfo> jobClusterDiscoveryInfoStream(String jobCluster) {
        AtomicReference lastJobIdRef = new AtomicReference();
        return this.clientWrapper.getNamedJobsIds(jobCluster).doOnUnsubscribe(() -> lastJobIdRef.set(null)).filter(newJobId -> {
            logger.info("Got job cluster {}'s new jobId : {}", (Object)jobCluster, newJobId);
            return this.newJobIdIsGreater((String)lastJobIdRef.get(), (String)newJobId);
        }).switchMap(jobId -> {
            if ("No_such_named_job".equals(jobId)) {
                return Observable.error((Throwable)new Exception("No such job cluster " + jobCluster));
            }
            lastJobIdRef.set(jobId);
            logger.info("[{}] switched to streaming discovery info for {}", (Object)jobCluster, jobId);
            return this.getSchedulingChanges((String)jobId);
        });
    }
}

