/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.client;

import io.mantisrx.client.MantisClient;
import io.mantisrx.client.SinkClient;
import io.mantisrx.client.SinkConnectionsStatus;
import io.mantisrx.client.SseSinkConnectionFunction;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.master.client.ConditionalRetry;
import io.mantisrx.server.master.client.NoSuchJobException;
import io.reactivx.mantis.operators.DropOperator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

public class MantisSSEJob
implements AutoCloseable {
    private static final String ConnectTimeoutSecsPropertyName = "MantisClientConnectTimeoutSecs";
    private static final Logger logger = LoggerFactory.getLogger(MantisSSEJob.class);
    private final Builder builder;
    private final Mode mode;
    private Observable<Observable<MantisServerSentEvent>> resultsObservable;
    private String jobId = null;
    private int forPartition = -1;
    private int totalPartitions = 0;

    private MantisSSEJob(Builder builder, Mode mode) {
        this.builder = builder;
        this.mode = mode;
        if (builder.connectTimeoutSecs > 0L) {
            System.setProperty(ConnectTimeoutSecsPropertyName, String.valueOf(builder.connectTimeoutSecs));
        }
    }

    @Override
    public synchronized void close() throws Exception {
        if (this.mode == Mode.Submit && this.builder.ephemeral) {
            if (this.jobId != null) {
                this.builder.mantisClient.killJob(this.jobId);
                logger.info("Sent kill to master for job " + this.jobId);
            } else {
                logger.warn("Unexpected to not have JobId to kill ephemeral job");
            }
        }
    }

    public String getJobId() {
        return this.jobId;
    }

    private Observable<Observable<MantisServerSentEvent>> sinksToObservable(Observable<SinkClient<MantisServerSentEvent>> sinkClients) {
        ConditionalRetry retryObject = new ConditionalRetry(null, "SinkClient_" + this.builder.name);
        return sinkClients.switchMap(serverSentEventSinkClient -> {
            if (serverSentEventSinkClient.hasError()) {
                return Observable.just((Object)Observable.just((Object)new MantisServerSentEvent(serverSentEventSinkClient.getError())));
            }
            return serverSentEventSinkClient.getPartitionedResults(this.forPartition, this.totalPartitions);
        }).doOnError(throwable -> {
            logger.warn("Error getting sink Observable: " + throwable.getMessage());
            if (!(throwable instanceof NoSuchJobException)) {
                retryObject.setErrorRef(throwable);
            }
        }).retryWhen(retryObject.getRetryLogic());
    }

    @Deprecated
    public synchronized Observable<MantisServerSentEvent> connectAndGetObservable() throws IllegalStateException {
        return this.connectAndGet().flatMap(o -> o);
    }

    public synchronized Observable<Observable<MantisServerSentEvent>> connectAndGet() throws IllegalStateException {
        if (this.mode != Mode.Connect) {
            throw new IllegalStateException("Can't call connect to sink");
        }
        if (this.resultsObservable == null) {
            logger.info("Getting sink for job name " + this.builder.name);
            Boolean exists = (Boolean)this.builder.mantisClient.namedJobExists(this.builder.name).take(1).toBlocking().first();
            this.resultsObservable = exists != false ? this.sinksToObservable(this.builder.mantisClient.getSinkClientByJobName(this.builder.name, new SseSinkConnectionFunction(true, (Action1<Throwable>)this.builder.onConnectionReset, this.builder.sinkParameters), (Observer<SinkConnectionsStatus>)this.builder.sinkConnectionsStatusObserver, this.builder.dataRecvTimeoutSecs)).share() : Observable.just((Object)Observable.just((Object)new MantisServerSentEvent("No such job name " + this.builder.name)));
        }
        return this.resultsObservable;
    }

    @Deprecated
    public synchronized Observable<MantisServerSentEvent> submitAndGetObservable() throws IllegalStateException {
        return this.submitAndGet().flatMap(o -> o);
    }

    public synchronized Observable<Observable<MantisServerSentEvent>> submitAndGet() throws IllegalStateException {
        if (this.mode != Mode.Submit) {
            throw new IllegalStateException("Can't submit job");
        }
        if (this.resultsObservable != null) {
            return this.resultsObservable;
        }
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Observable<MantisServerSentEvent>>(){

            public void call(Subscriber<? super Observable<MantisServerSentEvent>> subscriber) {
                try {
                    JobSla jobSla = MantisSSEJob.this.builder.jobSla == null ? new JobSla(0L, 0L, JobSla.StreamSLAType.Lossy, MantisSSEJob.this.builder.ephemeral ? MantisJobDurationType.Transient : MantisJobDurationType.Perpetual, "") : new JobSla(MantisSSEJob.this.builder.jobSla.getRuntimeLimitSecs(), MantisSSEJob.this.builder.jobSla.getMinRuntimeSecs(), MantisSSEJob.this.builder.jobSla.getSlaType(), MantisSSEJob.this.builder.ephemeral ? MantisJobDurationType.Transient : MantisJobDurationType.Perpetual, MantisSSEJob.this.builder.jobSla.getUserProvidedType());
                    MantisSSEJob.this.jobId = MantisSSEJob.this.builder.mantisClient.submitJob(MantisSSEJob.this.builder.name, MantisSSEJob.this.builder.jarVersion, MantisSSEJob.this.builder.parameters, jobSla, MantisSSEJob.this.builder.schedulingInfo);
                    logger.info("Submitted job name " + MantisSSEJob.this.builder.name + " and got jobId: " + MantisSSEJob.this.jobId);
                    MantisSSEJob.this.resultsObservable = MantisSSEJob.this.builder.mantisClient.getSinkClientByJobId(MantisSSEJob.this.jobId, new SseSinkConnectionFunction(true, (Action1<Throwable>)MantisSSEJob.this.builder.onConnectionReset), (Observer<SinkConnectionsStatus>)MantisSSEJob.this.builder.sinkConnectionsStatusObserver, MantisSSEJob.this.builder.dataRecvTimeoutSecs).getResults();
                    MantisSSEJob.this.resultsObservable.subscribe(subscriber);
                }
                catch (Exception e) {
                    subscriber.onError((Throwable)e);
                }
            }
        }).doOnError(throwable -> logger.warn(throwable.getMessage())).lift((Observable.Operator)new DropOperator("client_submit_sse_share")).share().observeOn(Schedulers.io());
    }

    public static class Builder {
        private final MantisClient mantisClient;
        private final List<Parameter> parameters = new ArrayList<Parameter>();
        private String name;
        private String jarVersion;
        private SinkParameters sinkParameters = new SinkParameters.Builder().build();
        private Action1<Throwable> onConnectionReset;
        private boolean ephemeral = false;
        private SchedulingInfo schedulingInfo;
        private JobSla jobSla;
        private long connectTimeoutSecs = 0L;
        private Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver = null;
        private long dataRecvTimeoutSecs = 5L;

        public Builder(Properties properties) {
            this(new MantisClient(properties));
        }

        public Builder() {
            Properties properties = new Properties();
            properties.setProperty("mantis.zookeeper.connectionTimeMs", "1000");
            properties.setProperty("mantis.zookeeper.connection.retrySleepMs", "500");
            properties.setProperty("mantis.zookeeper.connection.retryCount", "500");
            properties.setProperty("mantis.zookeeper.connectString", System.getenv("mantis.zookeeper.connectString"));
            properties.setProperty("mantis.zookeeper.root", System.getenv("mantis.zookeeper.root"));
            properties.setProperty("mantis.zookeeper.leader.announcement.path", System.getenv("mantis.zookeeper.leader.announcement.path"));
            this.mantisClient = new MantisClient(properties);
        }

        public Builder(MantisClient mantisClient) {
            this.mantisClient = mantisClient;
        }

        public Builder name(String name) {
            this.name = name;
            return this;
        }

        public Builder jarVersion(String jarVersion) {
            this.jarVersion = jarVersion;
            return this;
        }

        public Builder parameters(Parameter ... params) {
            this.parameters.addAll(Arrays.asList(params));
            return this;
        }

        public Builder sinkParams(SinkParameters queryParams) {
            this.sinkParameters = queryParams;
            return this;
        }

        public Builder onCloseKillJob() {
            this.ephemeral = true;
            return this;
        }

        public Builder schedulingInfo(SchedulingInfo schedulingInfo) {
            this.schedulingInfo = schedulingInfo;
            return this;
        }

        public Builder jobSla(JobSla jobSla) {
            this.jobSla = jobSla;
            if (jobSla != null) {
                this.ephemeral = jobSla.getDurationType() == MantisJobDurationType.Transient;
            }
            return this;
        }

        public Builder connectTimeoutSecs(long connectTimeoutSecs) {
            this.connectTimeoutSecs = connectTimeoutSecs;
            return this;
        }

        public Builder onConnectionReset(Action1<Throwable> onConnectionReset) {
            this.onConnectionReset = onConnectionReset;
            return this;
        }

        public Builder sinkConnectionsStatusObserver(Observer<SinkConnectionsStatus> sinkConnectionsStatusObserver) {
            this.sinkConnectionsStatusObserver = sinkConnectionsStatusObserver;
            return this;
        }

        public Builder sinkDataRecvTimeoutSecs(long t) {
            this.dataRecvTimeoutSecs = t;
            return this;
        }

        public MantisSSEJob buildJobSubmitter() {
            return new MantisSSEJob(this, Mode.Submit);
        }

        public MantisSSEJob buildJobConnector(int forPartition, int totalPartitions) {
            if (forPartition >= totalPartitions) {
                throw new IllegalArgumentException("forPartition " + forPartition + " must be less than totalPartitions " + totalPartitions);
            }
            MantisSSEJob job = new MantisSSEJob(this, Mode.Connect);
            job.forPartition = forPartition;
            job.totalPartitions = totalPartitions;
            return job;
        }

        public MantisSSEJob buildJobConnector() {
            return new MantisSSEJob(this, Mode.Connect);
        }
    }

    static enum Mode {
        Submit,
        Connect;

    }
}

