/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.wrangler;

import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageContext;
import io.cdap.cdap.etl.api.StageSubmitterContext;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.Precondition;
import io.cdap.wrangler.PreconditionException;
import io.cdap.wrangler.WranglerPipelineContext;
import io.cdap.wrangler.api.CompileException;
import io.cdap.wrangler.api.CompileStatus;
import io.cdap.wrangler.api.DirectiveLoadException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeParser;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.RecipeSymbol;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TokenGroup;
import io.cdap.wrangler.api.TransientStore;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.executor.RecipePipelineExecutor;
import io.cdap.wrangler.lineage.LineageOperations;
import io.cdap.wrangler.parser.GrammarBasedParser;
import io.cdap.wrangler.parser.MigrateToV2;
import io.cdap.wrangler.parser.RecipeCompiler;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="transform")
@Name(value="Wrangler")
@Description(value="Wrangler - A interactive tool for data cleansing and transformation.")
public class Wrangler
extends Transform<StructuredRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);
    private static final String APPLICATION_NAME = "dataprep";
    private static final String SERVICE_NAME = "service";
    private static final String CONFIG_METHOD = "config";
    private static final String ON_ERROR_DEFAULT = "fail-pipeline";
    private static final String ERROR_STRATEGY_DEFAULT = "wrangler.error.strategy.default";
    private final Config config;
    private RecipePipeline pipeline;
    private Schema oSchema = null;
    private long errorCounter;
    private Precondition condition = null;
    private TransientStore store;
    private DirectiveRegistry registry;
    private String onErrorStrategy;

    public Wrangler(Config config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer configurer) {
        super.configurePipeline(configurer);
        FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
        try {
            Schema iSchema = configurer.getStageConfigurer().getInputSchema();
            if (!(this.config.containsMacro("field") || this.config.field.equals("*") || this.config.field.equals("#"))) {
                this.validateInputSchema(iSchema, collector);
            }
            String directives = this.config.directives;
            if (this.config.udds != null && !this.config.udds.trim().isEmpty()) {
                directives = this.config.containsMacro("directives") ? String.format("#pragma load-directives %s;", this.config.udds) : String.format("#pragma load-directives %s;%s", this.config.udds, this.config.directives);
            }
            RecipeCompiler compiler = new RecipeCompiler();
            try {
                CompileStatus status = compiler.compile(new MigrateToV2(directives).migrate());
                RecipeSymbol symbols = status.getSymbols();
                if (symbols != null) {
                    Set dynamicDirectives = symbols.getLoadableDirectives();
                    for (String directive : dynamicDirectives) {
                        Object directivePlugin = configurer.usePlugin("directive", directive, directive, PluginProperties.builder().build());
                        if (directivePlugin != null) continue;
                        collector.addFailure(String.format("User Defined Directive '%s' is not deployed or is not available.", directive), "Ensure the directive is deployed.").withPluginNotFound(directive, directive, "directive").withConfigElement("udd", directive);
                    }
                    if (!this.config.containsMacro("directives")) {
                        this.registry = SystemDirectiveRegistry.INSTANCE;
                        for (TokenGroup group : symbols) {
                            String directive;
                            DirectiveInfo directiveInfo;
                            if (group == null || (directiveInfo = this.registry.get("", directive = (String)group.get(0).value())) != null || dynamicDirectives.contains(directive)) continue;
                            collector.addFailure(String.format("Wrangler plugin has a directive '%s' that does not exist in system or user space.", directive), "Ensure the directive is loaded or the directive name is correct.").withConfigProperty("directives");
                        }
                    }
                }
            }
            catch (CompileException e) {
                collector.addFailure("Compilation error occurred : " + e.getMessage(), null);
            }
            catch (DirectiveParseException e) {
                collector.addFailure(e.getMessage(), null);
            }
            try {
                if (!this.config.containsMacro("schema")) {
                    this.oSchema = Schema.parseJson((String)this.config.schema);
                }
            }
            catch (IOException e) {
                collector.addFailure("Invalid output schema.", null).withConfigProperty("schema").withStacktrace(e.getStackTrace());
            }
            if (!this.config.containsMacro("precondition") && this.config.precondition != null && !this.config.precondition.trim().isEmpty()) {
                try {
                    new Precondition(this.config.precondition);
                }
                catch (PreconditionException e) {
                    collector.addFailure(e.getMessage(), null).withConfigProperty("precondition");
                }
            }
            if (this.oSchema != null) {
                configurer.getStageConfigurer().setOutputSchema(this.oSchema);
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage());
            collector.addFailure("Error occurred : " + e.getMessage(), null).withStacktrace(e.getStackTrace());
        }
    }

    public void prepareRun(StageSubmitterContext context) throws Exception {
        super.prepareRun(context);
        Schema inputSchema = context.getInputSchema();
        if (inputSchema == null || inputSchema.getFields() == null || inputSchema.getFields().isEmpty()) {
            return;
        }
        Set input = inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toSet());
        Schema outputSchema = context.getOutputSchema();
        if (outputSchema == null || outputSchema.getFields() == null || outputSchema.getFields().isEmpty()) {
            return;
        }
        Set output = outputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toSet());
        RecipeParser recipe = this.getRecipeParser((StageContext)context);
        List directives = recipe.parse();
        LineageOperations lineageOperations = new LineageOperations(input, output, directives);
        context.record(lineageOperations.generate());
    }

    public void initialize(TransformContext context) throws Exception {
        super.initialize(context);
        this.store = new DefaultTransientStore();
        RecipeParser recipe = this.getRecipeParser((StageContext)context);
        WranglerPipelineContext ctx = new WranglerPipelineContext(ExecutorContext.Environment.TRANSFORM, context, this.store);
        try {
            this.oSchema = Schema.parseJson((String)this.config.schema);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.", context.getStageName()), e);
        }
        if (this.config.precondition != null && !this.config.precondition.trim().isEmpty()) {
            try {
                this.condition = new Precondition(this.config.precondition);
            }
            catch (PreconditionException e) {
                throw new IllegalArgumentException(e.getMessage(), e);
            }
        }
        try {
            this.pipeline = new RecipePipelineExecutor(recipe, (ExecutorContext)ctx);
        }
        catch (Exception e) {
            throw new Exception(String.format("Stage:%s - %s", this.getContext().getStageName(), e.getMessage()), e);
        }
        String defaultStrategy = context.getArguments().get(ERROR_STRATEGY_DEFAULT);
        this.onErrorStrategy = defaultStrategy != null && this.config.onError == null ? defaultStrategy : this.config.getOnError();
        this.errorCounter = 0L;
    }

    public void destroy() {
        super.destroy();
        this.pipeline.close();
        try {
            this.registry.close();
        }
        catch (IOException e) {
            LOG.warn("Unable to close the directive registry. You might see increasing number of open file handle.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        List records;
        long start = 0L;
        try {
            boolean skip;
            Row row = new Row();
            if ("*".equalsIgnoreCase(this.config.field)) {
                row = StructuredToRowTransformer.transform((StructuredRecord)input);
            } else if ("#".equalsIgnoreCase(this.config.field)) {
                row.add(input.getSchema().getRecordName(), (Object)input);
            } else {
                row.add(this.config.field, StructuredToRowTransformer.getValue((StructuredRecord)input, (String)this.config.field));
            }
            if (this.condition != null && (skip = this.condition.apply(row))) {
                this.getContext().getMetrics().count("precondition.filtered", 1);
                return;
            }
            this.store.reset(TransientVariableScope.GLOBAL);
            this.store.reset(TransientVariableScope.LOCAL);
            start = System.nanoTime();
            records = this.pipeline.execute(Collections.singletonList(row), this.oSchema);
            List errors = this.pipeline.errors();
            if (errors.size() > 0) {
                this.getContext().getMetrics().count("errors", errors.size());
                for (ErrorRecord error : errors) {
                    emitter.emitError(new InvalidEntry(error.getCode(), error.getMessage(), (Object)input));
                }
            }
        }
        catch (Exception e) {
            this.getContext().getMetrics().count("failure", 1);
            if (this.onErrorStrategy.equalsIgnoreCase("send-to-error-port")) {
                emitter.emitError(new InvalidEntry(0, e.getMessage(), (Object)input));
                return;
            }
            if (this.onErrorStrategy.equalsIgnoreCase(ON_ERROR_DEFAULT)) {
                emitter.emitAlert((Map)ImmutableMap.of((Object)"stage", (Object)this.getContext().getStageName(), (Object)"code", (Object)String.valueOf(1), (Object)"message", (Object)String.format("Stopping pipeline stage %s on error %s", this.getContext().getStageName(), e.getMessage()), (Object)"value", (Object)String.valueOf(this.errorCounter)));
                throw new Exception(String.format("Stage:%s - Failing pipeline due to error : %s", this.getContext().getStageName(), e.getMessage()), e);
            }
            return;
        }
        finally {
            this.getContext().getMetrics().gauge("process.time", System.nanoTime() - start);
        }
        for (StructuredRecord record : records) {
            StructuredRecord.Builder builder = StructuredRecord.builder((Schema)this.oSchema);
            for (Schema.Field field : this.oSchema.getFields()) {
                Object wObject = record.get(field.getName());
                if (wObject == null) {
                    builder.set(field.getName(), null);
                    continue;
                }
                if (wObject instanceof String) {
                    builder.convertAndSet(field.getName(), (String)wObject);
                    continue;
                }
                builder.set(field.getName(), wObject);
            }
            emitter.emit((Object)builder.build());
        }
    }

    private void validateInputSchema(@Nullable Schema inputSchema, FailureCollector collector) {
        Schema.Field inputSchemaField;
        if (inputSchema != null && (inputSchemaField = inputSchema.getField(this.config.field)) == null) {
            collector.addFailure(String.format("Field '%s' must be present in input schema.", this.config.field), null).withConfigProperty("field");
        }
    }

    private RecipeParser getRecipeParser(StageContext context) throws DirectiveLoadException, DirectiveParseException {
        this.registry = new CompositeDirectiveRegistry(new DirectiveRegistry[]{SystemDirectiveRegistry.INSTANCE, new UserDirectiveRegistry(context)});
        this.registry.reload(context.getNamespace());
        String directives = this.config.directives;
        if (this.config.udds != null && !this.config.udds.trim().isEmpty()) {
            directives = String.format("#pragma load-directives %s;%s", this.config.udds, this.config.directives);
        }
        return new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), this.registry);
    }

    public static class Config
    extends PluginConfig {
        static final String NAME_PRECONDITION = "precondition";
        static final String NAME_FIELD = "field";
        static final String NAME_DIRECTIVES = "directives";
        static final String NAME_UDD = "udd";
        static final String NAME_SCHEMA = "schema";
        static final String NAME_ON_ERROR = "on-error";
        @Name(value="precondition")
        @Description(value="Precondition expression specifying filtering before applying directives (true to filter)")
        @Macro
        private String precondition;
        @Name(value="directives")
        @Description(value="Recipe for wrangling the input records")
        @Macro
        @Nullable
        private String directives;
        @Name(value="udd")
        @Description(value="List of User Defined Directives (UDD) that have to be loaded.")
        @Nullable
        private String udds;
        @Name(value="field")
        @Description(value="Name of the input field to be wrangled or '*' to wrangle all the fields.")
        @Macro
        private final String field;
        @Name(value="schema")
        @Description(value="Specifies the schema that has to be output.")
        @Macro
        private final String schema;
        @Name(value="on-error")
        @Description(value="How to handle error in record processing")
        @Macro
        @Nullable
        private final String onError;

        public Config(String precondition, String directives, String udds, String field, String schema, String onError) {
            this.precondition = precondition;
            this.directives = directives;
            this.udds = udds;
            this.field = field;
            this.schema = schema;
            this.onError = onError;
        }

        public String getOnError() {
            return this.onError == null ? Wrangler.ON_ERROR_DEFAULT : this.onError;
        }
    }
}

