/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.sourcejob.synthetic.stage;

import io.mantisrx.common.JsonSerializer;
import io.mantisrx.common.codec.Codec;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.mql.jvm.core.Query;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.sourcejob.synthetic.core.MQLQueryManager;
import io.mantisrx.sourcejob.synthetic.core.TaggedData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public class TaggingStage
implements ScalarComputation<String, TaggedData> {
    private static final Logger log = LoggerFactory.getLogger(TaggingStage.class);
    public static final String MANTIS_META_SOURCE_NAME = "mantis.meta.sourceName";
    public static final String MANTIS_META_SOURCE_TIMESTAMP = "mantis.meta.timestamp";
    public static final String MANTIS_QUERY_COUNTER = "mantis_query_out";
    public static final String MQL_COUNTER = "mql_out";
    public static final String MQL_FAILURE = "mql_failure";
    public static final String MQL_CLASSLOADER_ERROR = "mql_classloader_error";
    public static final String SYNTHETIC_REQUEST_SOURCE = "SyntheticRequestSource";
    private AtomicBoolean errorLogged = new AtomicBoolean(false);

    public Observable<TaggedData> call(Context context, Observable<String> dataO) {
        JsonSerializer jsonSerializer = new JsonSerializer();
        return dataO.map(event -> {
            try {
                return jsonSerializer.toMap(event);
            }
            catch (Exception e) {
                log.error(e.getMessage());
                return null;
            }
        }).filter(Objects::nonNull).flatMapIterable(d -> this.tagData((Map<String, Object>)d, context));
    }

    public void init(Context context) {
        context.getMetricsRegistry().registerAndGet(new Metrics.Builder().name("mql").addCounter(MQL_COUNTER).addCounter(MQL_FAILURE).addCounter(MQL_CLASSLOADER_ERROR).addCounter(MANTIS_QUERY_COUNTER).build());
    }

    private List<TaggedData> tagData(Map<String, Object> d, Context context) {
        ArrayList<TaggedData> taggedDataList = new ArrayList<TaggedData>();
        Metrics metrics = context.getMetricsRegistry().getMetric(new MetricGroupId("mql"));
        Collection<Query> queries = MQLQueryManager.getInstance().getRegisteredQueries();
        for (Query query : queries) {
            try {
                if (!query.matches(d).booleanValue()) continue;
                Map projected = query.project(d);
                projected.put(MANTIS_META_SOURCE_NAME, SYNTHETIC_REQUEST_SOURCE);
                projected.put(MANTIS_META_SOURCE_TIMESTAMP, System.currentTimeMillis());
                TaggedData tg = new TaggedData(projected);
                tg.addMatchedClient(query.getSubscriptionId());
                taggedDataList.add(tg);
            }
            catch (Exception ex) {
                if (ex instanceof ClassNotFoundException) {
                    log.error("Error loading MQL: " + ex.getMessage());
                    ex.printStackTrace();
                    metrics.getCounter(MQL_CLASSLOADER_ERROR).increment();
                    continue;
                }
                ex.printStackTrace();
                metrics.getCounter(MQL_FAILURE).increment();
                log.error("MQL Error: " + ex.getMessage());
                log.error("MQL Query: " + query.getRawQuery());
                log.error("MQL Datum: " + d);
            }
            catch (Error e) {
                metrics.getCounter(MQL_FAILURE).increment();
                if (this.errorLogged.get()) continue;
                log.error("caught Error when processing MQL {} on {}", new Object[]{query.getRawQuery(), d.toString(), e});
                this.errorLogged.set(true);
            }
        }
        return taggedDataList;
    }

    public static ScalarToScalar.Config<String, TaggedData> config() {
        return new ScalarToScalar.Config().concurrentInput().codec(TaggingStage.taggedDataCodec());
    }

    public static Codec<TaggedData> taggedDataCodec() {
        return new Codec<TaggedData>(){

            public TaggedData decode(byte[] bytes) {
                return new TaggedData(new HashMap<String, Object>());
            }

            public byte[] encode(TaggedData value) {
                return new byte[128];
            }
        };
    }
}

