/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connectors.publish.source.http;

import io.mantisrx.connectors.publish.core.QueryRegistry;
import io.mantisrx.connectors.publish.source.http.NettySourceHttpServer;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.WorkerMap;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import io.reactivx.mantis.operators.DropOperator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

public class PushHttpSource
implements Source<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushHttpSource.class);
    private final Subject<String, String> eventSubject = new SerializedSubject((Subject)PublishSubject.create());
    private final QueryRegistry queryRegistry;
    private final int serverPort;
    private AtomicReference<WorkerMap> workerMapAtomicReference = new AtomicReference<WorkerMap>(new WorkerMap(new HashMap()));
    private static final String NETTY_THREAD_COUNT_PARAM_NAME = "nettyThreadCount";

    public PushHttpSource(QueryRegistry registry, int serverPort) {
        this.queryRegistry = registry;
        this.serverPort = serverPort;
    }

    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just((Object)this.eventSubject.lift((Observable.Operator)new DropOperator("incoming_" + PushHttpSource.class.getCanonicalName() + "_batch")).onErrorResumeNext(e -> Observable.empty()));
    }

    public void init(Context context, Index index) {
        LOGGER.info("Initializing PushHttpSource");
        int threadCount = (Integer)context.getParameters().get(NETTY_THREAD_COUNT_PARAM_NAME, (Object)4);
        LOGGER.info("PushHttpSource server starting at Port " + this.serverPort);
        NettySourceHttpServer server = new NettySourceHttpServer(context, threadCount);
        try {
            server.init(this.queryRegistry, this.eventSubject, this.serverPort);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        server.startServer();
        context.getWorkerMapObservable().subscribeOn(Schedulers.io()).subscribe(workerMap -> {
            LOGGER.info("Got WorkerUpdate" + workerMap);
            this.workerMapAtomicReference.set((WorkerMap)workerMap);
        });
        LOGGER.info("PushHttpSource server started");
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList parameters = new ArrayList();
        parameters.add(new IntParameter().name(NETTY_THREAD_COUNT_PARAM_NAME).validator(Validators.range((Number)1, (Number)8)).defaultValue((Object)4).build());
        parameters.add(new StringParameter().name("zoneList").description("list of Zones").validator(Validators.alwaysPass()).defaultValue((Object)"").build());
        parameters.add(new StringParameter().name("targetApp").description("target app").validator(Validators.alwaysPass()).defaultValue((Object)"").build());
        parameters.add(new StringParameter().name("targetASGs").description("target ASGs CSV regex").validator(Validators.alwaysPass()).defaultValue((Object)"").build());
        return parameters;
    }
}

