/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.sourcejob.kafka;

import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import io.mantisrx.sourcejob.kafka.AutoAckTaggingStage;
import io.mantisrx.sourcejob.kafka.core.TaggedData;
import io.mantisrx.sourcejob.kafka.core.utils.JsonUtility;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.functions.Func1;

public class CustomizedAutoAckTaggingStage
extends AutoAckTaggingStage {
    private static Logger logger = LoggerFactory.getLogger(CustomizedAutoAckTaggingStage.class);
    private String jobName;
    private String timestampField = "_ts_";
    private AtomicLong latestTimeStamp = new AtomicLong();
    private boolean isFlattenFields = false;
    private List<String> fieldsToFlatten = new ArrayList<String>();
    private Func1<Map<String, Object>, Map<String, Object>> preMapperFunc = t -> t;

    public void init(Context context) {
        String[] fields;
        String flattenFieldsStr;
        super.init(context);
        this.jobName = context.getWorkerInfo().getJobName();
        String timeStampFieldParam = System.getenv("JOB_PARAM_timeStampField");
        if (timeStampFieldParam != null && !timeStampFieldParam.isEmpty()) {
            this.timestampField = timeStampFieldParam;
        }
        if ((flattenFieldsStr = System.getenv("JOB_PARAM_fieldsToFlatten")) != null && !flattenFieldsStr.isEmpty() && !flattenFieldsStr.equals("NONE") && (fields = flattenFieldsStr.split(",")).length > 0) {
            this.isFlattenFields = true;
            for (String field : fields) {
                this.fieldsToFlatten.add(field.trim());
            }
            logger.info("Field flattening enabled for fields {}", this.fieldsToFlatten);
        }
    }

    private void flattenFields(Map<String, Object> rawData) {
        for (String field : this.fieldsToFlatten) {
            this.flattenField(rawData, field);
        }
    }

    private void flattenField(Map<String, Object> rawData, String fieldName) {
        String dataJson = (String)rawData.get(fieldName);
        try {
            Map<String, Object> geoDataMap = JsonUtility.jsonToMap(dataJson);
            for (Map.Entry<String, Object> e : geoDataMap.entrySet()) {
                String key = e.getKey();
                Object val = e.getValue();
                if (key == null || val == null) continue;
                rawData.put(fieldName + "." + key, val);
            }
        }
        catch (Exception e) {
            logger.warn("Error flattening field " + fieldName + " error -> " + e.getMessage());
        }
    }

    @Override
    protected Map<String, Object> applyPreMapping(Context context, Map<String, Object> rawData) {
        long latestTsYet;
        long ts;
        if (rawData == null) {
            throw new RuntimeException("rawData is null");
        }
        long now = System.currentTimeMillis();
        if (rawData.containsKey(this.timestampField) && (ts = ((Long)rawData.get(this.timestampField)).longValue()) > (latestTsYet = this.latestTimeStamp.get())) {
            this.latestTimeStamp.compareAndSet(latestTsYet, ts);
        }
        try {
            this.preMapperFunc.call(rawData);
        }
        catch (Exception e) {
            logger.warn("Exception applying premapping function " + e.getMessage());
        }
        HashMap<String, Object> modifiedData = new HashMap<String, Object>(rawData);
        modifiedData.put("mantis.meta.sourceName", this.jobName);
        modifiedData.put("mantis.meta.timestamp", now);
        if (this.isFlattenFields) {
            this.flattenFields(modifiedData);
        }
        return modifiedData;
    }

    public static ScalarToScalar.Config<KafkaAckable, TaggedData> config() {
        ScalarToScalar.Config config = new ScalarToScalar.Config().concurrentInput().codec(AutoAckTaggingStage.taggedDataCodec()).withParameters(CustomizedAutoAckTaggingStage.getParameters());
        String jobParamPrefix = "JOB_PARAM_";
        String stageConcurrencyParam = jobParamPrefix + "mantis.stageConcurrency";
        String concurrency = System.getenv(stageConcurrencyParam);
        if (concurrency != null && !concurrency.isEmpty()) {
            logger.info("Job param: " + stageConcurrencyParam + " value: " + concurrency);
            try {
                config = config.concurrentInput(Integer.parseInt(concurrency));
            }
            catch (NumberFormatException numberFormatException) {
                // empty catch block
            }
        }
        return config;
    }

    static List<ParameterDefinition<?>> getParameters() {
        ArrayList params = Lists.newArrayList();
        params.add(new StringParameter().name("fieldsToFlatten").description("comma separated list of fields to flatten").validator(Validators.notNullOrEmpty()).defaultValue((Object)"NONE").build());
        params.add(new StringParameter().name("timeStampField").description("the timestamp field in the event. used to calculate lag").validator(Validators.notNullOrEmpty()).defaultValue((Object)"_ts_").build());
        params.add(new IntParameter().name("mantis.stageConcurrency").description("Parameter to control number of computation workers to use for stage processing").defaultValue((Object)1).validator(Validators.range((Number)1, (Number)8)).build());
        return params;
    }
}

