/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.client.examples;

import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.client.MantisSSEJob;
import io.mantisrx.client.examples.SubmitEphemeralJob;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobDurationType;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

public class SubmitWithRuntimeLimit {
    private static final Logger logger = LoggerFactory.getLogger(SubmitWithRuntimeLimit.class);
    @Argument(alias="p", description="Specify a configuration file", required=true)
    private static String propFile = "";
    @Argument(alias="n", description="Job name for submission", required=false)
    private static String jobName;

    public static void main(String[] args) {
        try {
            Args.parse(SubmitWithRuntimeLimit.class, (String[])args);
        }
        catch (IllegalArgumentException e) {
            Args.usage(SubmitEphemeralJob.class);
            System.exit(1);
        }
        Properties properties = new Properties();
        try (FileInputStream inputStream = new FileInputStream(propFile);){
            properties.load(inputStream);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicBoolean completed = new AtomicBoolean(false);
        long runtimeLimitSecs = 30L;
        MantisSSEJob job = new MantisSSEJob.Builder(properties).name(jobName).jobSla(new JobSla(30L, 0L, JobSla.StreamSLAType.Lossy, MantisJobDurationType.Perpetual, "")).onConnectionReset(new Action1<Throwable>(){

            public void call(Throwable throwable) {
                System.err.println("Reconnecting due to error: " + throwable.getMessage());
            }
        }).buildJobSubmitter();
        Observable<Observable<MantisServerSentEvent>> observable = job.submitAndGet();
        Subscription subscription = observable.flatMap(new Func1<Observable<MantisServerSentEvent>, Observable<?>>(){

            public Observable<?> call(Observable<MantisServerSentEvent> eventObservable) {
                return eventObservable.doOnNext((Action1)new Action1<MantisServerSentEvent>(){

                    public void call(MantisServerSentEvent event) {
                        if (completed.get()) {
                            System.out.println("FAILURE");
                        }
                        System.out.println("Got: " + event.getEventAsString());
                    }
                });
            }
        }).doOnCompleted(new Action0(){

            public void call() {
                latch.countDown();
            }
        }).subscribe();
        try {
            Thread.sleep(40000L);
            completed.set(true);
            Thread.sleep(20000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        subscription.unsubscribe();
        System.exit(0);
    }
}

