/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.sink;

import com.mantisrx.common.utils.Closeables;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.ServerSentEventsSink;
import io.mantisrx.runtime.sink.Sink;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;

public class Sinks {
    public static <T> Sink<T> eagerSubscribe(final Sink<T> sink) {
        return new Sink<T>(){
            private Subscription subscription;

            @Override
            public List<ParameterDefinition<?>> getParameters() {
                return sink.getParameters();
            }

            public void call(Context c, PortRequest p, Observable<T> o) {
                this.subscription = o.subscribe();
                sink.call(c, p, o);
            }

            @Override
            public void init(Context t) {
                sink.init(t);
            }

            @Override
            public void close() throws IOException {
                try {
                    sink.close();
                }
                finally {
                    this.subscription.unsubscribe();
                }
            }
        };
    }

    public static <T> SelfDocumentingSink<T> eagerSubscribe(final SelfDocumentingSink<T> sink) {
        return new SelfDocumentingSink<T>(){
            private Subscription subscription;

            @Override
            public List<ParameterDefinition<?>> getParameters() {
                return sink.getParameters();
            }

            public void call(Context c, PortRequest p, Observable<T> o) {
                this.subscription = o.subscribe();
                sink.call(c, p, o);
            }

            @Override
            public Metadata metadata() {
                return sink.metadata();
            }

            @Override
            public void init(Context t) {
                sink.init(t);
            }

            @Override
            public void close() throws IOException {
                try {
                    sink.close();
                }
                finally {
                    this.subscription.unsubscribe();
                }
            }
        };
    }

    @SafeVarargs
    public static <T> Sink<T> toMany(final Sink<T> ... many) {
        return new Sink<T>(){

            @Override
            public List<ParameterDefinition<?>> getParameters() {
                ArrayList parameterDefinitions = new ArrayList();
                for (Sink sink : many) {
                    parameterDefinitions.addAll(sink.getParameters());
                }
                return parameterDefinitions;
            }

            public void call(Context t1, PortRequest t2, Observable<T> t3) {
                for (Sink sink : many) {
                    sink.call(t1, t2, t3);
                }
            }

            @Override
            public void init(Context t) {
                for (Sink sink : many) {
                    sink.init(t);
                }
            }

            @Override
            public void close() throws IOException {
                Closeables.combine((Closeable[])many).close();
            }
        };
    }

    public static <T> ServerSentEventsSink<T> sse(Func1<T, String> encoder) {
        return new ServerSentEventsSink<T>(encoder);
    }

    public static <T> Sink<T> sysout() {
        return new Sink<T>(){
            private Subscription subscription;

            public void call(Context t1, PortRequest p, Observable<T> t2) {
                this.subscription = t2.subscribe(new Observer<T>(){

                    public void onCompleted() {
                        System.out.println("completed");
                    }

                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    public void onNext(T t) {
                        System.out.println(t);
                    }
                });
            }

            @Override
            public void close() {
                this.subscription.unsubscribe();
            }
        };
    }
}

