/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.metrics.rx.MonitorOperator;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.executor.PortSelector;
import io.mantisrx.runtime.executor.WorkerPublisher;
import io.mantisrx.runtime.sink.Sink;
import io.reactivex.mantis.remote.observable.RxMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;

public class SinkPublisher<T, R>
implements WorkerPublisher<T, R> {
    private static final Logger logger = LoggerFactory.getLogger(SinkPublisher.class);
    private SinkHolder<R> sinkHolder;
    private PortSelector portSelector;
    private Context context;
    private Action0 observableTerminatedCallback;
    private Action0 onSubscribeAction;
    private Action0 onUnsubscribeAction;
    private Action0 observableOnCompleteCallback;
    private Action1<Throwable> observableOnErrorCallback;

    public SinkPublisher(SinkHolder<R> sinkHolder, PortSelector portSelector, Context context, Action0 observableTerminatedCallback, Action0 onSubscribeAction, Action0 onUnsubscribeAction, Action0 observableOnCompleteCallback, Action1<Throwable> observableOnErrorCallback) {
        this.sinkHolder = sinkHolder;
        this.portSelector = portSelector;
        this.context = context;
        this.observableTerminatedCallback = observableTerminatedCallback;
        this.onSubscribeAction = onSubscribeAction;
        this.onUnsubscribeAction = onUnsubscribeAction;
        this.observableOnCompleteCallback = observableOnCompleteCallback;
        this.observableOnErrorCallback = observableOnErrorCallback;
    }

    @Override
    public void start(StageConfig<T, R> stage, Observable<Observable<R>> observablesToPublish) {
        Sink<R> sink = this.sinkHolder.getSinkAction();
        int sinkPort = -1;
        if (this.sinkHolder.isPortRequested()) {
            sinkPort = this.portSelector.acquirePort();
        }
        Observable<R> merged = Observable.merge(observablesToPublish);
        final Observable wrappedO = merged.lift(new MonitorOperator("worker_sink"));
        Observable<Object> o = Observable.create(new Observable.OnSubscribe<Object>(){

            @Override
            public void call(Subscriber subscriber2) {
                logger.info("Got sink subscription, onSubscribe=" + SinkPublisher.this.onSubscribeAction);
                wrappedO.doOnCompleted(SinkPublisher.this.observableOnCompleteCallback).doOnError(SinkPublisher.this.observableOnErrorCallback).doOnTerminate(SinkPublisher.this.observableTerminatedCallback).subscribe(subscriber2);
                if (SinkPublisher.this.onSubscribeAction != null) {
                    SinkPublisher.this.onSubscribeAction.call();
                }
            }
        }).doOnUnsubscribe(new Action0(){

            @Override
            public void call() {
                logger.info("Sink subscriptions clean up, action=" + SinkPublisher.this.onUnsubscribeAction);
                if (SinkPublisher.this.onUnsubscribeAction != null) {
                    SinkPublisher.this.onUnsubscribeAction.call();
                }
            }
        }).share();
        if (this.context.getWorkerInfo().getDurationType() == MantisJobDurationType.Perpetual) {
            o.subscribe();
        }
        sink.call(this.context, new PortRequest(sinkPort), o);
    }

    @Override
    public RxMetrics getMetrics() {
        return null;
    }

    @Override
    public void stop() {
    }
}

