/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.sourcejobs.publish;

import io.mantisrx.connector.publish.core.QueryRegistry;
import io.mantisrx.connector.publish.source.http.PushHttpSource;
import io.mantisrx.connector.publish.source.http.SourceSink;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.sink.Sink;
import io.mantisrx.runtime.source.Source;
import io.mantisrx.sourcejobs.publish.core.RequestPostProcessor;
import io.mantisrx.sourcejobs.publish.core.RequestPreProcessor;
import io.mantisrx.sourcejobs.publish.core.Utils;
import io.mantisrx.sourcejobs.publish.stages.EchoStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Func2;

public class PushRequestEventSourceJob
extends MantisJobProvider<String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushRequestEventSourceJob.class);
    private static final String MANTIS_CLIENT_ID = "MantisPushRequestEvents";

    public Job<String> getJobInstance() {
        String jobId = Utils.getEnvVariable("JOB_ID", "PushRequestEventSourceJobLocal-1");
        String mantisClientId = "MantisPushRequestEvents_" + jobId;
        QueryRegistry queryRegistry = new QueryRegistry.Builder().withClientIdPrefix(mantisClientId).build();
        String customPortName = "MANTIS_WORKER_CUSTOM_PORT";
        String consolePort = Utils.getEnvVariable(customPortName, "9090");
        int port = 9090;
        if (consolePort != null && !consolePort.isEmpty()) {
            port = Integer.parseInt(consolePort);
        }
        return MantisJob.source((Source)new PushHttpSource(queryRegistry, port)).stage((ScalarComputation)new EchoStage(), EchoStage.config()).sink((Sink)new SourceSink((Func2)new RequestPreProcessor(queryRegistry), (Func2)new RequestPostProcessor(queryRegistry), mantisClientId)).parameterDefinition(new IntParameter().name("bufferDurationMillis").description("millis to buffer events before processing").validator(Validators.range((Number)100, (Number)1000)).defaultValue((Object)250).build()).metadata(new Metadata.Builder().name("PushRequestEventSourceJob").description("Fetches request events from any source in a distributed manner. The output is served via HTTP server using SSE protocol.").build()).create();
    }

    public static void main(String[] args) {
        LocalJobExecutorNetworked.execute(new PushRequestEventSourceJob().getJobInstance(), (Parameter[])new Parameter[0]);
    }
}

