/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.KeyValueStageConfig;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.executor.WorkerPublisher;
import io.mantisrx.server.core.ServiceRegistry;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.reactivex.mantis.network.push.HashFunctions;
import io.reactivex.mantis.network.push.KeyValuePair;
import io.reactivex.mantis.network.push.LegacyTcpPushServer;
import io.reactivex.mantis.network.push.PushServers;
import io.reactivex.mantis.network.push.Routers;
import io.reactivex.mantis.network.push.ServerConfig;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ServeNestedObservable;
import io.reactivex.mantis.remote.observable.slotting.RoundRobin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

public class WorkerPublisherRemoteObservable<T>
implements WorkerPublisher<T> {
    private static final Logger logger = LoggerFactory.getLogger(WorkerPublisherRemoteObservable.class);
    private final String name;
    private final int serverPort;
    private RemoteRxServer server;
    private final MantisPropertiesLoader propService;
    private String jobName;

    public WorkerPublisherRemoteObservable(int serverPort, String name2, Observable<Integer> minConnectionsToSubscribe, String jobName) {
        this.name = name2;
        this.serverPort = serverPort;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
        this.jobName = jobName;
    }

    @Override
    public void start(StageConfig<?, T> stage, Observable<Observable<T>> toServe) {
        RemoteRxServer.Builder serverBuilder = new RemoteRxServer.Builder();
        if (stage instanceof KeyValueStageConfig) {
            LegacyTcpPushServer modernServer = this.startKeyValueStage((KeyValueStageConfig)stage, toServe);
            this.server = new LegacyRxServer(modernServer);
        } else if (stage instanceof ScalarToScalar || stage instanceof KeyToScalar || stage instanceof GroupToScalar) {
            if (this.runNewW2Wserver(this.jobName)) {
                logger.info("Modern server setup for name: " + this.name + " type: Scalarstage");
                Func1<Object, byte[]> encoder = t1 -> stage.getOutputCodec().encode(t1);
                ServerConfig<Object> config = new ServerConfig.Builder().name(this.name).port(this.serverPort).metricsRegistry(MetricsRegistry.getInstance()).router(Routers.roundRobinLegacyTcpProtocol(this.name, encoder)).build();
                LegacyTcpPushServer<Object> modernServer = PushServers.infiniteStreamLegacyTcpNested(config, toServe);
                this.server = new LegacyRxServer<Object>(modernServer);
            } else {
                logger.info("Legacy server setup for name: " + this.name + " type: Scalarstage");
                RoundRobin slotting = new RoundRobin();
                serverBuilder.addObservable(new ServeNestedObservable.Builder().name(this.name).encoder(stage.getOutputCodec()).observable(toServe).slottingStrategy(slotting).build());
                MetricsRegistry.getInstance().registerAndGet(slotting.getMetrics());
                this.server = serverBuilder.port(this.serverPort).build();
            }
        } else {
            throw new RuntimeException("Unsupported stage type: " + stage);
        }
        this.server.start();
    }

    private <K> LegacyTcpPushServer<KeyValuePair<K, T>> startKeyValueStage(KeyValueStageConfig<?, K, T> stage, Observable<Observable<T>> toServe) {
        Preconditions.checkArgument(this.runNewW2WserverGroups(this.jobName), String.format("Need to use new worker2worker server group for jobName %s", this.jobName));
        logger.info("Modern server setup for name: {} type: Keyedstage", (Object)this.name);
        long expiryTimeInSecs = Long.MAX_VALUE;
        if (stage instanceof KeyToKey) {
            expiryTimeInSecs = ((KeyToKey)stage).getKeyExpireTimeSeconds();
        } else if (stage instanceof ScalarToKey) {
            expiryTimeInSecs = ((ScalarToKey)stage).getKeyExpireTimeSeconds();
        }
        Func1<Object, byte[]> valueEncoder = t1 -> stage.getOutputCodec().encode(t1);
        Func1<Object, byte[]> keyEncoder = t1 -> stage.getOutputKeyCodec().encode(t1);
        ServerConfig config = new ServerConfig.Builder().name(this.name).port(this.serverPort).metricsRegistry(MetricsRegistry.getInstance()).numQueueConsumers(this.numConsumerThreads()).maxChunkSize(this.maxChunkSize()).maxChunkTimeMSec(this.maxChunkTimeMSec()).bufferCapacity(this.bufferCapacity()).useSpscQueue(this.useSpsc()).router(Routers.consistentHashingLegacyTcpProtocol(this.jobName, keyEncoder, valueEncoder)).build();
        if (stage instanceof ScalarToGroup || stage instanceof GroupToGroup) {
            return PushServers.infiniteStreamLegacyTcpNestedMantisGroup(config, toServe, expiryTimeInSecs, keyEncoder, HashFunctions.ketama());
        }
        return PushServers.infiniteStreamLegacyTcpNestedGroupedObservable(config, toServe, expiryTimeInSecs, keyEncoder, HashFunctions.ketama());
    }

    private boolean useSpsc() {
        String stringValue = this.propService.getStringValue("mantis.w2w.spsc", "false");
        return Boolean.parseBoolean(stringValue);
    }

    private int bufferCapacity() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyBuffer", "50000");
        return Integer.parseInt(stringValue);
    }

    private int maxChunkTimeMSec() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyMaxChunkTimeMSec", "250");
        return Integer.parseInt(stringValue);
    }

    private int maxChunkSize() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyMaxChunkSize", "1000");
        return Integer.parseInt(stringValue);
    }

    private int numConsumerThreads() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyThreads", "1");
        return Integer.parseInt(stringValue);
    }

    private boolean runNewW2Wserver(String jobName) {
        String legacyServerString = this.propService.getStringValue("mantis.w2w.newServerImplScalar", "true");
        String legacyServerStringPerJob = this.propService.getStringValue(jobName + ".mantis.w2w.newServerImplScalar", "false");
        return Boolean.parseBoolean(legacyServerString) || Boolean.parseBoolean(legacyServerStringPerJob);
    }

    private boolean runNewW2WserverGroups(String jobName) {
        String legacyServerString = this.propService.getStringValue("mantis.w2w.newServerImplKeyed", "true");
        String legacyServerStringPerJob = this.propService.getStringValue(jobName + ".mantis.w2w.newServerImplKeyed", "false");
        return Boolean.parseBoolean(legacyServerString) || Boolean.parseBoolean(legacyServerStringPerJob);
    }

    @Override
    public void close() {
        this.server.shutdown();
    }

    public RemoteRxServer getServer() {
        return this.server;
    }

    @Override
    public RxMetrics getMetrics() {
        return this.server.getMetrics();
    }

    private static class LegacyRxServer<T>
    extends RemoteRxServer {
        private final LegacyTcpPushServer<T> modernServer;

        public LegacyRxServer(LegacyTcpPushServer<T> modernServer) {
            this.modernServer = modernServer;
        }

        @Override
        public void start() {
            this.modernServer.start();
        }

        @Override
        public void startAndWait() {
        }

        @Override
        public void shutdown() {
            this.modernServer.shutdown();
        }

        @Override
        public void blockUntilServerShutdown() {
            this.modernServer.blockUntilShutdown();
        }
    }
}

