/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.directives.row;

import com.google.common.collect.ImmutableList;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.EntityCountMetric;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.api.annotations.Categories;
import io.cdap.wrangler.api.lineage.Lineage;
import io.cdap.wrangler.api.lineage.Mutation;
import io.cdap.wrangler.api.parser.Expression;
import io.cdap.wrangler.api.parser.Identifier;
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.expression.EL;
import io.cdap.wrangler.expression.ELContext;
import io.cdap.wrangler.expression.ELException;
import io.cdap.wrangler.expression.ELResult;
import io.cdap.wrangler.metrics.JexlCategoryMetricUtils;
import java.util.ArrayList;
import java.util.List;

@Plugin(type="directive")
@Name(value="send-to-error-and-continue")
@Categories(categories={"row", "data-quality"})
@Description(value="Send records that match condition to the error collector and continues processing.")
public class SendToErrorAndContinue
implements Directive,
Lineage {
    public static final String NAME = "send-to-error-and-continue";
    private EL el;
    private String condition;
    private String metric = null;
    private String message = null;

    public UsageDefinition define() {
        UsageDefinition.Builder builder = UsageDefinition.builder((String)NAME);
        builder.define("condition", TokenType.EXPRESSION);
        builder.define("metric", TokenType.IDENTIFIER, true);
        builder.define("message", TokenType.TEXT, true);
        return builder.build();
    }

    public void initialize(Arguments args) throws DirectiveParseException {
        this.condition = ((Expression)args.value("condition")).value();
        try {
            this.el = EL.compile(this.condition);
        }
        catch (ELException e) {
            throw new DirectiveParseException(NAME, String.format("Invalid condition '%s'.", this.condition), (Throwable)e);
        }
        if (args.contains("metric")) {
            this.metric = ((Identifier)args.value("metric")).value();
        }
        if (args.contains("message")) {
            this.message = ((Text)args.value("message")).value();
        }
    }

    public void destroy() {
    }

    public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException, ReportErrorAndProceed {
        if (context != null) {
            context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_total", 1L);
        }
        ArrayList<Row> results = new ArrayList<Row>();
        for (Row row : rows) {
            ELContext ctx = new ELContext(context, this.el, row);
            try {
                ELResult result = this.el.execute(ctx);
                if (result.getBoolean().booleanValue()) {
                    if (this.metric != null && context != null) {
                        context.getMetrics().count(this.metric, 1);
                    }
                    if (this.message == null) {
                        this.message = this.condition;
                    }
                    if (context != null) {
                        context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_failure", 1L);
                    }
                    throw new ReportErrorAndProceed(this.message, 1);
                }
            }
            catch (ELException e) {
                throw new DirectiveExecutionException(NAME, e.getMessage(), (Throwable)e);
            }
            results.add(row);
        }
        return results;
    }

    public Mutation lineage() {
        Mutation.Builder builder = Mutation.builder().readable("Redirect records to error path based on expression'%s'", new Object[]{this.condition});
        this.el.variables().forEach(column -> builder.relation(column, column));
        return builder.build();
    }

    public List<EntityCountMetric> getCountMetrics() {
        EntityCountMetric jexlCategoryMetric = JexlCategoryMetricUtils.getJexlCategoryMetric(this.el.getScriptParsedText());
        return jexlCategoryMetric == null ? null : ImmutableList.of((Object)jexlCategoryMetric);
    }
}

