/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.sourcejob.kafka;

import io.mantisrx.common.codec.Codec;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.mql.jvm.core.Query;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.sourcejob.kafka.core.TaggedData;
import io.mantisrx.sourcejob.kafka.sink.MQLQueryManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public abstract class AbstractAckableTaggingStage
implements ScalarComputation<KafkaAckable, TaggedData> {
    public static final String MANTIS_META_IS_COMPLETE_DATA = "mantis.meta.isCompleteData";
    public static final String MANTIS_META_SOURCE_NAME = "mantis.meta.sourceName";
    public static final String MANTIS_META_SOURCE_TIMESTAMP = "mantis.meta.timestamp";
    public static final String MANTIS_QUERY_COUNTER = "mantis_query_out";
    public static final String MQL_COUNTER = "mql_out";
    public static final String MQL_FAILURE = "mql_failure";
    public static final String MQL_CLASSLOADER_ERROR = "mql_classloader_error";
    private static final Logger logger = LoggerFactory.getLogger(AbstractAckableTaggingStage.class);
    private static final String MANTIS_META = "mantis.meta";
    protected AtomicBoolean trackIsComplete = new AtomicBoolean(false);
    private AtomicBoolean errorLogged = new AtomicBoolean(false);

    public Observable<TaggedData> call(Context context, Observable<KafkaAckable> data) {
        context.getMetricsRegistry().registerAndGet(new Metrics.Builder().name("mql").addCounter(MQL_COUNTER).addCounter(MQL_FAILURE).addCounter(MQL_CLASSLOADER_ERROR).addCounter(MANTIS_QUERY_COUNTER).build());
        return data.map(ackable -> {
            Map<String, Object> rawData = this.processAndAck(context, (KafkaAckable)ackable);
            return this.preProcess(rawData);
        }).filter(d -> !d.isEmpty()).map(mapData -> this.applyPreMapping(context, (Map<String, Object>)mapData)).filter(d -> !d.isEmpty()).flatMapIterable(d -> this.tagData((Map<String, Object>)d, context));
    }

    protected abstract Map<String, Object> processAndAck(Context var1, KafkaAckable var2);

    protected abstract Map<String, Object> preProcess(Map<String, Object> var1);

    protected Map<String, Object> applyPreMapping(Context context, Map<String, Object> rawData) {
        return rawData;
    }

    private boolean isMetaEvent(Map<String, Object> d) {
        return d.containsKey(MANTIS_META_IS_COMPLETE_DATA) || d.containsKey(MANTIS_META);
    }

    protected List<TaggedData> tagData(Map<String, Object> d, Context context) {
        ArrayList<TaggedData> taggedDataList = new ArrayList<TaggedData>();
        boolean metaEvent = this.isMetaEvent(d);
        Metrics metrics = context.getMetricsRegistry().getMetric("mql");
        Collection<Query> queries = MQLQueryManager.getInstance().getRegisteredQueries();
        for (Query query : queries) {
            try {
                if (metaEvent) {
                    TaggedData tg = new TaggedData(d);
                    tg.addMatchedClient(query.getSubscriptionId());
                    taggedDataList.add(tg);
                    continue;
                }
                if (!query.matches(d).booleanValue()) continue;
                Map projected = query.project(d);
                projected.put(MANTIS_META_SOURCE_NAME, d.get(MANTIS_META_SOURCE_NAME));
                projected.put(MANTIS_META_SOURCE_TIMESTAMP, d.get(MANTIS_META_SOURCE_TIMESTAMP));
                TaggedData tg = new TaggedData(projected);
                tg.addMatchedClient(query.getSubscriptionId());
                taggedDataList.add(tg);
            }
            catch (Exception ex) {
                if (ex instanceof ClassNotFoundException) {
                    logger.error("Error loading MQL: " + ex.getMessage());
                    ex.printStackTrace();
                    metrics.getCounter(MQL_CLASSLOADER_ERROR).increment();
                    continue;
                }
                ex.printStackTrace();
                metrics.getCounter(MQL_FAILURE).increment();
                logger.error("MQL Error: " + ex.getMessage());
                logger.error("MQL Query: " + query.getRawQuery());
                logger.error("MQL Datum: " + d);
            }
            catch (Error e) {
                metrics.getCounter(MQL_FAILURE).increment();
                if (this.errorLogged.get()) continue;
                logger.error("caught Error when processing MQL {} on {}", new Object[]{query.getRawQuery(), d.toString(), e});
                this.errorLogged.set(true);
            }
        }
        return taggedDataList;
    }

    public static Codec<TaggedData> taggedDataCodec() {
        return new Codec<TaggedData>(){

            public TaggedData decode(byte[] bytes) {
                return new TaggedData(new HashMap<String, Object>());
            }

            public byte[] encode(TaggedData value) {
                return new byte[128];
            }
        };
    }
}

