/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.wrangler;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.api.metrics.Metrics;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageContext;
import io.cdap.cdap.etl.api.StageSubmitterContext;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.cdap.etl.api.relational.Capability;
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.ExpressionFactoryType;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType;
import io.cdap.cdap.features.Feature;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.Precondition;
import io.cdap.wrangler.PreconditionException;
import io.cdap.wrangler.WranglerPipelineContext;
import io.cdap.wrangler.api.CompileException;
import io.cdap.wrangler.api.CompileStatus;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveLoadException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.EntityCountMetric;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeParser;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.RecipeSymbol;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TokenGroup;
import io.cdap.wrangler.api.TransientStore;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.executor.RecipePipelineExecutor;
import io.cdap.wrangler.lineage.LineageOperations;
import io.cdap.wrangler.parser.GrammarBasedParser;
import io.cdap.wrangler.parser.MigrateToV2;
import io.cdap.wrangler.parser.RecipeCompiler;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="transform")
@Name(value="Wrangler")
@Description(value="Wrangler - A interactive tool for data cleansing and transformation.")
public class Wrangler
extends Transform<StructuredRecord, StructuredRecord>
implements LinearRelationalTransform {
    private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);
    private static final String APPLICATION_NAME = "dataprep";
    private static final String SERVICE_NAME = "service";
    private static final String CONFIG_METHOD = "config";
    private static final String ON_ERROR_DEFAULT = "fail-pipeline";
    private static final String ON_ERROR_FAIL_PIPELINE = "fail-pipeline";
    private static final String ON_ERROR_PROCEED = "send-to-error-port";
    private static final String ERROR_STRATEGY_DEFAULT = "wrangler.error.strategy.default";
    public static final String DIRECTIVE_METRIC_NAME = "wrangler.directive.count";
    public static final int DIRECTIVE_METRIC_COUNT = 1;
    public static final String DIRECTIVE_ENTITY_TYPE = "directive";
    private static final String PRECONDITION_LANGUAGE_JEXL = "jexl";
    private static final String PRECONDITION_LANGUAGE_SQL = "sql";
    private final Config config;
    private RecipePipeline pipeline;
    private Schema oSchema = null;
    private long errorCounter;
    private Precondition condition = null;
    private TransientStore store;
    private DirectiveRegistry registry;
    private String onErrorStrategy;

    public Wrangler(Config config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer configurer) {
        super.configurePipeline(configurer);
        FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
        try {
            Schema iSchema = configurer.getStageConfigurer().getInputSchema();
            if (!(this.config.containsMacro("field") || this.config.getField().equals("*") || this.config.getField().equals("#"))) {
                this.validateInputSchema(iSchema, collector);
            }
            String directives = this.config.getDirectives();
            if (this.config.getUDDs() != null && !this.config.getUDDs().trim().isEmpty()) {
                directives = this.config.containsMacro("directives") ? String.format("#pragma load-directives %s;", this.config.getUDDs()) : String.format("#pragma load-directives %s;%s", this.config.getUDDs(), this.config.getDirectives());
            }
            if (!this.config.containsMacro("expressionLanguage")) {
                if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(this.config.getPreconditionLanguage())) {
                    if (!this.config.containsMacro("preconditionSQL")) {
                        this.validatePrecondition(this.config.getPreconditionSQL(), true, collector);
                    }
                    this.validateSQLModeDirectives(collector);
                } else if (!this.config.containsMacro("precondition")) {
                    this.validatePrecondition(this.config.getPreconditionJEXL(), false, collector);
                }
            }
            RecipeCompiler compiler = new RecipeCompiler();
            try {
                CompileStatus status = compiler.compile(new MigrateToV2(directives).migrate());
                RecipeSymbol symbols = status.getSymbols();
                if (symbols != null) {
                    Set dynamicDirectives = symbols.getLoadableDirectives();
                    for (String directive : dynamicDirectives) {
                        Object directivePlugin = configurer.usePlugin(DIRECTIVE_ENTITY_TYPE, directive, directive, PluginProperties.builder().build());
                        if (directivePlugin != null) continue;
                        collector.addFailure(String.format("User Defined Directive '%s' is not deployed or is not available.", directive), "Ensure the directive is deployed.").withPluginNotFound(directive, directive, DIRECTIVE_ENTITY_TYPE).withConfigElement("udd", directive);
                    }
                    if (!this.config.containsMacro("directives")) {
                        this.registry = SystemDirectiveRegistry.INSTANCE;
                        for (TokenGroup group : symbols) {
                            String directive;
                            DirectiveInfo directiveInfo;
                            if (group == null || (directiveInfo = this.registry.get("", directive = (String)group.get(0).value())) != null || dynamicDirectives.contains(directive)) continue;
                            collector.addFailure(String.format("Wrangler plugin has a directive '%s' that does not exist in system or user space.", directive), "Ensure the directive is loaded or the directive name is correct.").withConfigProperty("directives");
                        }
                    }
                }
            }
            catch (CompileException e) {
                collector.addFailure("Compilation error occurred : " + e.getMessage(), null);
            }
            catch (DirectiveParseException e) {
                collector.addFailure(e.getMessage(), null);
            }
            try {
                if (!this.config.containsMacro("schema")) {
                    this.oSchema = Schema.parseJson((String)this.config.schema);
                }
            }
            catch (IOException e) {
                collector.addFailure("Invalid output schema.", null).withConfigProperty("schema").withStacktrace(e.getStackTrace());
            }
            if (!this.config.containsMacro("precondition") && !this.config.containsMacro("expressionLanguage") && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(this.config.getPreconditionLanguage()) && this.checkPreconditionNotEmpty(false)) {
                try {
                    new Precondition(this.config.getPreconditionJEXL());
                }
                catch (PreconditionException e) {
                    collector.addFailure(e.getMessage(), null).withConfigProperty("precondition");
                }
            }
            if (this.oSchema != null) {
                configurer.getStageConfigurer().setOutputSchema(this.oSchema);
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage());
            collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace());
        }
    }

    public void prepareRun(StageSubmitterContext context) throws Exception {
        super.prepareRun(context);
        Schema inputSchema = context.getInputSchema();
        if (inputSchema == null || inputSchema.getFields() == null || inputSchema.getFields().isEmpty()) {
            return;
        }
        Set input = inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toSet());
        Schema outputSchema = context.getOutputSchema();
        if (outputSchema == null || outputSchema.getFields() == null || outputSchema.getFields().isEmpty()) {
            return;
        }
        Set output = outputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toSet());
        RecipeParser recipe = this.getRecipeParser((StageContext)context);
        List directives = recipe.parse();
        this.emitDirectiveMetrics(directives, (Metrics)context.getMetrics());
        LineageOperations lineageOperations = new LineageOperations(input, output, directives);
        context.record(lineageOperations.generate());
    }

    public void initialize(TransformContext context) throws Exception {
        super.initialize(context);
        this.store = new DefaultTransientStore();
        RecipeParser recipe = this.getRecipeParser((StageContext)context);
        WranglerPipelineContext ctx = new WranglerPipelineContext(ExecutorContext.Environment.TRANSFORM, context, this.store);
        try {
            this.oSchema = Schema.parseJson((String)this.config.schema);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.", context.getStageName()), e);
        }
        if (!this.config.containsMacro("expressionLanguage") && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(this.config.getPreconditionLanguage()) && this.checkPreconditionNotEmpty(false)) {
            try {
                this.condition = new Precondition(this.config.getPreconditionJEXL());
            }
            catch (PreconditionException e) {
                throw new IllegalArgumentException(e.getMessage(), e);
            }
        }
        try {
            this.pipeline = new RecipePipelineExecutor(recipe, (ExecutorContext)ctx);
        }
        catch (Exception e) {
            throw new Exception(String.format("Stage:%s - %s", this.getContext().getStageName(), e.getMessage()), e);
        }
        String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT);
        this.onErrorStrategy = defaultStrategy != null && this.config.onError == null ? defaultStrategy : this.config.getOnError();
        this.errorCounter = 0L;
    }

    public void destroy() {
        super.destroy();
        this.pipeline.close();
        try {
            this.registry.close();
        }
        catch (IOException e) {
            LOG.warn("Unable to close the directive registry. You might see increasing number of open file handle.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        List records;
        long start = 0L;
        try {
            boolean skip;
            Row row = new Row();
            if ("*".equalsIgnoreCase(this.config.getField())) {
                row = StructuredToRowTransformer.transform((StructuredRecord)input);
            } else if ("#".equalsIgnoreCase(this.config.getField())) {
                row.add(input.getSchema().getRecordName(), (Object)input);
            } else {
                row.add(this.config.getField(), StructuredToRowTransformer.getValue((StructuredRecord)input, (String)this.config.getField()));
            }
            if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(this.config.getPreconditionLanguage()) && this.checkPreconditionNotEmpty(false) && (skip = this.condition.apply(row))) {
                this.getContext().getMetrics().count("precondition.filtered", 1);
                return;
            }
            this.store.reset(TransientVariableScope.GLOBAL);
            this.store.reset(TransientVariableScope.LOCAL);
            start = System.nanoTime();
            records = this.pipeline.execute(Collections.singletonList(row), this.oSchema);
            List errors = this.pipeline.errors();
            if (errors.size() > 0) {
                StringJoiner errorMessages = new StringJoiner(",");
                this.getContext().getMetrics().count("errors", errors.size());
                for (ErrorRecord error : errors) {
                    emitter.emitError(new InvalidEntry(error.getCode(), error.getMessage(), (Object)input));
                    errorMessages.add(error.getMessage());
                }
                if (Feature.WRANGLER_FAIL_PIPELINE_FOR_ERROR.isEnabled((FeatureFlagsProvider)this.getContext()) && this.onErrorStrategy.equalsIgnoreCase("fail-pipeline")) {
                    throw new Exception(String.format("Errors in Wrangler Transformation - %s", errorMessages));
                }
            }
        }
        catch (Exception e) {
            this.getContext().getMetrics().count("failure", 1);
            if (this.onErrorStrategy.equalsIgnoreCase(ON_ERROR_PROCEED)) {
                emitter.emitError(new InvalidEntry(0, e.getMessage(), (Object)input));
                return;
            }
            if (this.onErrorStrategy.equalsIgnoreCase("fail-pipeline")) {
                emitter.emitAlert((Map)ImmutableMap.of((Object)"stage", (Object)this.getContext().getStageName(), (Object)"code", (Object)String.valueOf(1), (Object)"message", (Object)String.format("Stopping pipeline stage %s on error %s", this.getContext().getStageName(), e.getMessage()), (Object)"value", (Object)String.valueOf(this.errorCounter)));
                throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s", this.getContext().getStageName(), e.getMessage()), e);
            }
            return;
        }
        finally {
            this.getContext().getMetrics().gauge("process.time", System.nanoTime() - start);
        }
        for (StructuredRecord record : records) {
            StructuredRecord.Builder builder = StructuredRecord.builder((Schema)this.oSchema);
            for (Schema.Field field : this.oSchema.getFields()) {
                Object wObject = record.get(field.getName());
                if (wObject == null) {
                    builder.set(field.getName(), null);
                    continue;
                }
                if (wObject instanceof String) {
                    builder.convertAndSet(field.getName(), (String)wObject);
                    continue;
                }
                builder.set(field.getName(), wObject);
            }
            emitter.emit((Object)builder.build());
        }
    }

    private void validateInputSchema(@Nullable Schema inputSchema, FailureCollector collector) {
        Schema.Field inputSchemaField;
        if (inputSchema != null && (inputSchemaField = inputSchema.getField(this.config.getField())) == null) {
            collector.addFailure(String.format("Field '%s' must be present in input schema.", this.config.getField()), null).withConfigProperty("field");
        }
    }

    private void validatePrecondition(String precondition, Boolean isConditionSQL, FailureCollector collector) {
        String field = "precondition";
        String language = "Precondition (JEXL)";
        if (isConditionSQL.booleanValue()) {
            field = "preconditionSQL";
            language = "Precondition (SQL)";
        }
        if (Strings.isNullOrEmpty((String)precondition)) {
            collector.addFailure(String.format("%s must be present.", language), null).withConfigProperty(field);
        }
    }

    private void validateSQLModeDirectives(FailureCollector collector) {
        if (!Strings.isNullOrEmpty((String)this.config.getDirectives())) {
            collector.addFailure("Directives are not supported for precondition of type SQL", null).withConfigProperty("directives");
        }
        if (!Strings.isNullOrEmpty((String)this.config.getUDDs())) {
            collector.addFailure("UDDs are not supported for precondition of type SQL", null).withConfigProperty("udd");
        }
    }

    private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) {
        if (!(isConditionSQL.booleanValue() || Strings.isNullOrEmpty((String)this.config.getPreconditionJEXL()) || this.config.getPreconditionJEXL().trim().isEmpty())) {
            return true;
        }
        return isConditionSQL != false && !Strings.isNullOrEmpty((String)this.config.getPreconditionSQL()) && !this.config.getPreconditionSQL().trim().isEmpty();
    }

    private RecipeParser getRecipeParser(StageContext context) throws DirectiveLoadException, DirectiveParseException {
        this.registry = new CompositeDirectiveRegistry(new DirectiveRegistry[]{SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context)});
        this.registry.reload(context.getNamespace());
        String directives = this.config.getDirectives();
        if (this.config.getUDDs() != null && !this.config.getUDDs().trim().isEmpty()) {
            directives = String.format("#pragma load-directives %s;%s", this.config.getUDDs(), this.config.getDirectives());
        }
        return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), this.registry);
    }

    public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) {
        if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(this.config.getPreconditionLanguage()) && this.checkPreconditionNotEmpty(true)) {
            if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled((FeatureFlagsProvider)relationalTranformContext)) {
                throw new RuntimeException("SQL Precondition feature is not available");
            }
            Optional<ExpressionFactory<String>> expressionFactory = this.getExpressionFactory(relationalTranformContext);
            if (!expressionFactory.isPresent()) {
                return new InvalidRelation("Cannot find an Expression Factory");
            }
            Expression filterExpression = expressionFactory.get().compile((Object)this.config.getPreconditionSQL());
            return relation.filter(filterExpression);
        }
        return new InvalidRelation("Plugin is not configured for relational transformation");
    }

    private Optional<ExpressionFactory<String>> getExpressionFactory(RelationalTranformContext ctx) {
        return ctx.getEngine().getExpressionFactory((ExpressionFactoryType)StringExpressionFactoryType.SQL, new Capability[0]);
    }

    private void emitDirectiveMetrics(List<Directive> directives, Metrics metrics) throws DirectiveLoadException {
        for (Directive directive : directives) {
            if (this.registry.get("system", directive.define().getDirectiveName()) == null) continue;
            ArrayList<EntityCountMetric> countMetrics = new ArrayList<EntityCountMetric>();
            countMetrics.add(this.getDirectiveUsageMetric(directive.define().getDirectiveName()));
            if (directive.getCountMetrics() != null) {
                countMetrics.addAll(directive.getCountMetrics());
            }
            for (EntityCountMetric countMetric : countMetrics) {
                Metrics child = metrics.child(this.getEntityMetricTags(countMetric));
                child.countLong(countMetric.getName(), countMetric.getCount());
            }
        }
    }

    private EntityCountMetric getDirectiveUsageMetric(String directiveName) {
        return new EntityCountMetric(DIRECTIVE_METRIC_NAME, DIRECTIVE_ENTITY_TYPE, directiveName, 1L);
    }

    private Map<String, String> getEntityMetricTags(EntityCountMetric metricDef) {
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("aet", metricDef.getAppEntityType());
        tags.put("tpe", metricDef.getAppEntityTypeName());
        return tags;
    }

    public static class Config
    extends PluginConfig {
        static final String NAME_PRECONDITION = "precondition";
        static final String NAME_PRECONDITION_SQL = "preconditionSQL";
        static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage";
        static final String NAME_FIELD = "field";
        static final String NAME_DIRECTIVES = "directives";
        static final String NAME_UDD = "udd";
        static final String NAME_SCHEMA = "schema";
        static final String NAME_ON_ERROR = "on-error";
        @Name(value="expressionLanguage")
        @Description(value="Toggle to configure precondition language between JEXL and SQL")
        @Macro
        @Nullable
        private String preconditionLanguage;
        @Name(value="precondition")
        @Description(value="JEXL Precondition expression specifying filtering before applying directives (true to filter)")
        @Macro
        @Nullable
        private String precondition;
        @Name(value="preconditionSQL")
        @Description(value="SQL Precondition expression specifying filtering before applying directives (false to filter)")
        @Macro
        @Nullable
        private String preconditionSQL;
        @Name(value="directives")
        @Description(value="Recipe for wrangling the input records")
        @Macro
        @Nullable
        private String directives;
        @Name(value="udd")
        @Description(value="List of User Defined Directives (UDD) that have to be loaded.")
        @Nullable
        private String udds;
        @Name(value="field")
        @Description(value="Name of the input field to be wrangled or '*' to wrangle all the fields.")
        @Macro
        private final String field;
        @Name(value="schema")
        @Description(value="Specifies the schema that has to be output.")
        @Macro
        private final String schema;
        @Name(value="on-error")
        @Description(value="How to handle error in record processing")
        @Macro
        @Nullable
        private final String onError;

        public Config(String preconditionLanguage, String precondition, String directives, String udds, String field, String schema, String onError) {
            this.preconditionLanguage = preconditionLanguage;
            this.precondition = precondition;
            this.directives = directives;
            this.udds = udds;
            this.preconditionSQL = precondition;
            this.field = field;
            this.schema = schema;
            this.onError = onError;
        }

        public String getOnError() {
            return this.onError == null ? "fail-pipeline" : this.onError;
        }

        public String getPreconditionLanguage() {
            if (Strings.isNullOrEmpty((String)this.preconditionLanguage)) {
                return Wrangler.PRECONDITION_LANGUAGE_JEXL;
            }
            return this.preconditionLanguage;
        }

        public String getPreconditionJEXL() {
            return this.precondition;
        }

        public String getPreconditionSQL() {
            return this.preconditionSQL;
        }

        public String getField() {
            return this.field;
        }

        public String getDirectives() {
            return this.directives;
        }

        public String getUDDs() {
            return this.udds;
        }
    }
}

