/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.codec.Codecs;
import io.mantisrx.common.codec.Decoder;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.executor.WorkerConsumer;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.DynamicConnectionSet;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.reconciliator.Reconciliator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class WorkerConsumerRemoteObservable<T, R>
implements WorkerConsumer<T, R> {
    private static final Logger logger = LoggerFactory.getLogger(WorkerConsumerRemoteObservable.class);
    private String name;
    private DynamicConnectionSet<T> connectionSet;
    private EndpointInjector injector;
    private Reconciliator<T> reconciliator;

    public WorkerConsumerRemoteObservable(String name, EndpointInjector endpointInjector) {
        this.name = name;
        this.injector = endpointInjector;
    }

    @Override
    public Observable<Observable<T>> start(StageConfig<T, R> stage) {
        if (stage instanceof KeyToKey || stage instanceof KeyToScalar || stage instanceof GroupToScalar || stage instanceof GroupToGroup) {
            logger.info("Remote connection to stage " + this.name + " is KeyedStage");
            ConnectToGroupedObservable.Builder connectToBuilder = new ConnectToGroupedObservable.Builder().name(this.name).keyDecoder((Decoder)Codecs.string()).valueDecoder(stage.getInputCodec()).subscribeAttempts(30);
            this.connectionSet = DynamicConnectionSet.createMGO((ConnectToGroupedObservable.Builder)connectToBuilder);
        } else if (stage instanceof ScalarToScalar || stage instanceof ScalarToKey || stage instanceof ScalarToGroup) {
            logger.info("Remote connection to stage " + this.name + " is ScalarStage");
            ConnectToObservable.Builder connectToBuilder = new ConnectToObservable.Builder().name(this.name).decoder(stage.getInputCodec()).subscribeAttempts(30);
            this.connectionSet = DynamicConnectionSet.create((ConnectToObservable.Builder)connectToBuilder);
        } else {
            throw new RuntimeException("Unsupported stage type: " + stage);
        }
        this.reconciliator = new Reconciliator.Builder().name("worker2worker_" + this.name).connectionSet(this.connectionSet).injector(this.injector).build();
        this.registerMetrics(this.reconciliator.getMetrics());
        this.registerMetrics(this.connectionSet.getConnectionMetrics());
        return this.reconciliator.observables();
    }

    private void registerMetrics(Metrics metrics) {
        MetricsRegistry.getInstance().registerAndGet(metrics);
    }

    @Override
    public void stop() {
    }
}

