/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.sourcejobs.publish.stages;

import io.mantisrx.common.codec.Codecs;
import io.mantisrx.publish.netty.proto.MantisEventEnvelope;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import rx.Observable;

public class EchoStage
implements ScalarComputation<String, String> {
    private static final Logger LOGGER = Logger.getLogger(EchoStage.class);
    private String clusterName;
    private int bufferDuration = 100;
    private String sourceNamePrefix;
    private ObjectReader mantisEventEnvelopeReader;
    private final ObjectMapper mapper = new ObjectMapper();

    public void init(Context context) {
        this.clusterName = context.getWorkerInfo().getJobClusterName();
        this.bufferDuration = (Integer)context.getParameters().get("bufferDurationMillis");
        this.sourceNamePrefix = "{\"mantis.meta.sourceName\":\"" + this.clusterName + "\",";
        this.mantisEventEnvelopeReader = this.mapper.readerFor(MantisEventEnvelope.class);
    }

    private String insertSourceJobName(String event) {
        StringBuilder sb = new StringBuilder(this.sourceNamePrefix);
        int indexofbrace = event.indexOf(123);
        if (indexofbrace != -1) {
            event = sb.append(event.substring(indexofbrace + 1)).toString();
        }
        return event;
    }

    public Observable<String> call(Context context, Observable<String> events) {
        return events.buffer((long)this.bufferDuration, TimeUnit.MILLISECONDS).flatMapIterable(i -> i).filter(event -> !event.isEmpty()).flatMap(envelopeStr -> {
            try {
                MantisEventEnvelope envelope = (MantisEventEnvelope)this.mantisEventEnvelopeReader.readValue(envelopeStr);
                return Observable.from((Iterable)envelope.getEventList()).map(event -> event.getData());
            }
            catch (IOException e) {
                LOGGER.error((Object)e.getMessage());
                return Observable.just((Object)envelopeStr);
            }
        }).map(this::insertSourceJobName).onErrorResumeNext(t1 -> {
            LOGGER.error((Object)("Exception occurred in : " + this.clusterName + " error is " + t1.getMessage()));
            return Observable.empty();
        });
    }

    public static List<ParameterDefinition<?>> getParameters() {
        ArrayList params = new ArrayList();
        params.add(new IntParameter().name("bufferDurationMillis").description("buffer time in millis").validator(Validators.range((Number)100, (Number)10000)).defaultValue((Object)250).build());
        return params;
    }

    public static ScalarToScalar.Config<String, String> config() {
        return new ScalarToScalar.Config().codec(Codecs.string()).concurrentInput().withParameters(EchoStage.getParameters());
    }
}

