/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.metrics.rx.MonitorOperator;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.SinkHolder;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.executor.PortSelector;
import io.mantisrx.runtime.executor.WorkerPublisher;
import io.mantisrx.runtime.sink.Sink;
import io.reactivex.mantis.remote.observable.RxMetrics;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

public class SinkPublisher<T>
implements WorkerPublisher<T> {
    private static final Logger logger = LoggerFactory.getLogger(SinkPublisher.class);
    private final SinkHolder<T> sinkHolder;
    private final PortSelector portSelector;
    private final Context context;
    private final Action0 observableTerminatedCallback;
    private final Action0 onSubscribeAction;
    private final Action0 onUnsubscribeAction;
    private final Action0 observableOnCompleteCallback;
    private final Action1<Throwable> observableOnErrorCallback;
    private Subscription eagerSubscription;
    private Sink<T> sink;

    public SinkPublisher(SinkHolder<T> sinkHolder, PortSelector portSelector, Context context, Action0 observableTerminatedCallback, Action0 onSubscribeAction, Action0 onUnsubscribeAction, Action0 observableOnCompleteCallback, Action1<Throwable> observableOnErrorCallback) {
        this.sinkHolder = sinkHolder;
        this.portSelector = portSelector;
        this.context = context;
        this.observableTerminatedCallback = observableTerminatedCallback;
        this.onSubscribeAction = onSubscribeAction;
        this.onUnsubscribeAction = onUnsubscribeAction;
        this.observableOnCompleteCallback = observableOnCompleteCallback;
        this.observableOnErrorCallback = observableOnErrorCallback;
    }

    @Override
    public void start(StageConfig<?, T> stage, Observable<Observable<T>> observablesToPublish) {
        this.sink = this.sinkHolder.getSinkAction();
        int sinkPort = -1;
        if (this.sinkHolder.isPortRequested()) {
            sinkPort = this.portSelector.acquirePort();
        }
        Observable merged = Observable.merge(observablesToPublish);
        final Observable wrappedO = merged.lift((Observable.Operator)new MonitorOperator("worker_sink"));
        Observable o = Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Object>(){

            public void call(Subscriber subscriber) {
                logger.info("Got sink subscription, onSubscribe=" + SinkPublisher.this.onSubscribeAction);
                wrappedO.doOnCompleted(SinkPublisher.this.observableOnCompleteCallback).doOnError(SinkPublisher.this.observableOnErrorCallback).doOnTerminate(SinkPublisher.this.observableTerminatedCallback).subscribe(subscriber);
                if (SinkPublisher.this.onSubscribeAction != null) {
                    SinkPublisher.this.onSubscribeAction.call();
                }
            }
        }).doOnUnsubscribe(new Action0(){

            public void call() {
                logger.info("Sink subscriptions clean up, action=" + SinkPublisher.this.onUnsubscribeAction);
                if (SinkPublisher.this.onUnsubscribeAction != null) {
                    SinkPublisher.this.onUnsubscribeAction.call();
                }
            }
        }).share();
        if (this.context.getWorkerInfo().getDurationType() == MantisJobDurationType.Perpetual) {
            this.eagerSubscription = o.subscribe();
        }
        this.sink.init(this.context);
        this.sink.call(this.context, new PortRequest(sinkPort), o);
    }

    @Override
    public RxMetrics getMetrics() {
        return null;
    }

    @Override
    public void close() throws IOException {
        try {
            this.sink.close();
        }
        finally {
            this.sink = null;
            if (this.eagerSubscription != null) {
                this.eagerSubscription.unsubscribe();
                this.eagerSubscription = null;
            }
        }
    }
}

