/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.executor;

import io.mantisrx.common.codec.Codecs;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.properties.MantisPropertiesService;
import io.mantisrx.runtime.GroupToGroup;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.KeyToKey;
import io.mantisrx.runtime.KeyToScalar;
import io.mantisrx.runtime.ScalarToGroup;
import io.mantisrx.runtime.ScalarToKey;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.StageConfig;
import io.mantisrx.runtime.executor.WorkerPublisher;
import io.mantisrx.server.core.ServiceRegistry;
import io.reactivex.mantis.network.push.HashFunctions;
import io.reactivex.mantis.network.push.LegacyTcpPushServer;
import io.reactivex.mantis.network.push.PushServers;
import io.reactivex.mantis.network.push.Routers;
import io.reactivex.mantis.network.push.ServerConfig;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.RxMetrics;
import io.reactivex.mantis.remote.observable.ServeNestedObservable;
import io.reactivex.mantis.remote.observable.slotting.RoundRobin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

public class WorkerPublisherRemoteObservable<T, R>
implements WorkerPublisher<T, R> {
    private static final Logger logger = LoggerFactory.getLogger(WorkerPublisherRemoteObservable.class);
    private String name;
    private int serverPort;
    private RemoteRxServer server;
    private Observable<Integer> minConnectionsToSubscribe;
    private MantisPropertiesService propService;
    private String jobName;

    public WorkerPublisherRemoteObservable(int serverPort, String name2, Observable<Integer> minConnectionsToSubscribe, String jobName) {
        this.name = name2;
        this.serverPort = serverPort;
        this.minConnectionsToSubscribe = minConnectionsToSubscribe;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
        this.jobName = jobName;
    }

    @Override
    public void start(final StageConfig<T, R> stage, Observable<Observable<R>> toServe) {
        RemoteRxServer.Builder serverBuilder = new RemoteRxServer.Builder();
        if (stage instanceof KeyToKey || stage instanceof ScalarToKey || stage instanceof ScalarToGroup || stage instanceof GroupToGroup) {
            if (this.runNewW2WserverGroups(this.jobName)) {
                logger.info("Modern server setup for name: " + this.name + " type: Keyedstage");
                long expiryTimeInSecs = Long.MAX_VALUE;
                if (stage instanceof KeyToKey) {
                    expiryTimeInSecs = ((KeyToKey)stage).getKeyExpireTimeSeconds();
                } else if (stage instanceof ScalarToKey) {
                    expiryTimeInSecs = ((ScalarToKey)stage).getKeyExpireTimeSeconds();
                }
                Func1 valueEncoder = new Func1<R, byte[]>(){

                    @Override
                    public byte[] call(R t1) {
                        return stage.getOutputCodec().encode(t1);
                    }
                };
                Func1<String, byte[]> keyEncoder = new Func1<String, byte[]>(){

                    @Override
                    public byte[] call(String t1) {
                        return Codecs.string().encode(t1);
                    }
                };
                ServerConfig config = new ServerConfig.Builder().name(this.name).port(this.serverPort).metricsRegistry(MetricsRegistry.getInstance()).numQueueConsumers(this.numConsumerThreads()).maxChunkSize(this.maxChunkSize()).maxChunkTimeMSec(this.maxChunkTimeMSec()).bufferCapacity(this.bufferCapacity()).useSpscQueue(this.useSpsc()).router(Routers.consistentHashingLegacyTcpProtocol(this.jobName, keyEncoder, valueEncoder)).build();
                Observable go = toServe;
                if (stage instanceof ScalarToGroup || stage instanceof GroupToGroup) {
                    final LegacyTcpPushServer modernServer = PushServers.infiniteStreamLegacyTcpNestedMantisGroup(config, go, expiryTimeInSecs, keyEncoder, HashFunctions.ketama());
                    modernServer.start();
                    this.server = new RemoteRxServer(){

                        @Override
                        public void start() {
                        }

                        @Override
                        public void startAndWait() {
                        }

                        @Override
                        public void shutdown() {
                            modernServer.shutdown();
                        }

                        @Override
                        public void blockUntilServerShutdown() {
                            modernServer.blockUntilShutdown();
                        }
                    };
                } else {
                    final LegacyTcpPushServer modernServer = PushServers.infiniteStreamLegacyTcpNestedGroupedObservable(config, go, expiryTimeInSecs, keyEncoder, HashFunctions.ketama());
                    modernServer.start();
                    this.server = new RemoteRxServer(){

                        @Override
                        public void start() {
                        }

                        @Override
                        public void startAndWait() {
                        }

                        @Override
                        public void shutdown() {
                            modernServer.shutdown();
                        }

                        @Override
                        public void blockUntilServerShutdown() {
                            modernServer.blockUntilShutdown();
                        }
                    };
                }
            }
        } else if (stage instanceof ScalarToScalar || stage instanceof KeyToScalar || stage instanceof GroupToScalar) {
            if (this.runNewW2Wserver(this.jobName)) {
                logger.info("Modern server setup for name: " + this.name + " type: Scalarstage");
                Func1 encoder = new Func1<R, byte[]>(){

                    @Override
                    public byte[] call(R t1) {
                        return stage.getOutputCodec().encode(t1);
                    }
                };
                ServerConfig config = new ServerConfig.Builder().name(this.name).port(this.serverPort).metricsRegistry(MetricsRegistry.getInstance()).router(Routers.roundRobinLegacyTcpProtocol(this.name, encoder)).build();
                final LegacyTcpPushServer modernServer = PushServers.infiniteStreamLegacyTcpNested(config, toServe);
                modernServer.start();
                this.server = new RemoteRxServer(){

                    @Override
                    public void start() {
                    }

                    @Override
                    public void startAndWait() {
                    }

                    @Override
                    public void shutdown() {
                        modernServer.shutdown();
                    }

                    @Override
                    public void blockUntilServerShutdown() {
                        modernServer.blockUntilShutdown();
                    }
                };
            } else {
                logger.info("Legacy server setup for name: " + this.name + " type: Scalarstage");
                RoundRobin slotting = new RoundRobin();
                serverBuilder.addObservable(new ServeNestedObservable.Builder().name(this.name).encoder(stage.getOutputCodec()).observable(toServe).slottingStrategy(slotting).build());
                MetricsRegistry.getInstance().registerAndGet(slotting.getMetrics());
                this.server = serverBuilder.port(this.serverPort).build();
                this.server.start();
            }
        } else {
            throw new RuntimeException("Unsupported stage type: " + stage);
        }
    }

    private boolean useSpsc() {
        String stringValue = this.propService.getStringValue("mantis.w2w.spsc", "false");
        return Boolean.parseBoolean(stringValue);
    }

    private int bufferCapacity() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyBuffer", "50000");
        return Integer.parseInt(stringValue);
    }

    private int maxChunkTimeMSec() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyMaxChunkTimeMSec", "250");
        return Integer.parseInt(stringValue);
    }

    private int maxChunkSize() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyMaxChunkSize", "1000");
        return Integer.parseInt(stringValue);
    }

    private int numConsumerThreads() {
        String stringValue = this.propService.getStringValue("mantis.w2w.toKeyThreads", "1");
        return Integer.parseInt(stringValue);
    }

    private boolean runNewW2Wserver(String jobName) {
        String legacyServerString = this.propService.getStringValue("mantis.w2w.newServerImplScalar", "true");
        String legacyServerStringPerJob = this.propService.getStringValue(jobName + ".mantis.w2w.newServerImplScalar", "false");
        return Boolean.parseBoolean(legacyServerString) || Boolean.parseBoolean(legacyServerStringPerJob);
    }

    private boolean runNewW2WserverGroups(String jobName) {
        String legacyServerString = this.propService.getStringValue("mantis.w2w.newServerImplKeyed", "true");
        String legacyServerStringPerJob = this.propService.getStringValue(jobName + ".mantis.w2w.newServerImplKeyed", "false");
        return Boolean.parseBoolean(legacyServerString) || Boolean.parseBoolean(legacyServerStringPerJob);
    }

    @Override
    public void stop() {
        this.server.shutdown();
    }

    public RemoteRxServer getServer() {
        return this.server;
    }

    @Override
    public RxMetrics getMetrics() {
        return this.server.getMetrics();
    }
}

