/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.wrangler;

import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.api.plugin.PluginProperties;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageContext;
import io.cdap.cdap.etl.api.StageSubmitterContext;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.cdap.etl.api.lineage.field.FieldTransformOperation;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.Precondition;
import io.cdap.wrangler.PreconditionException;
import io.cdap.wrangler.WranglerPipelineContext;
import io.cdap.wrangler.api.CompileException;
import io.cdap.wrangler.api.CompileStatus;
import io.cdap.wrangler.api.DirectiveContext;
import io.cdap.wrangler.api.DirectiveExecutionException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeParser;
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.RecipeSymbol;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TokenGroup;
import io.cdap.wrangler.api.TransientStore;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.executor.RecipePipelineExecutor;
import io.cdap.wrangler.parser.ConfigDirectiveContext;
import io.cdap.wrangler.parser.GrammarBasedParser;
import io.cdap.wrangler.parser.MigrateToV2;
import io.cdap.wrangler.parser.RecipeCompiler;
import io.cdap.wrangler.registry.CompositeDirectiveRegistry;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="transform")
@Name(value="Wrangler")
@Description(value="Wrangler - A interactive tool for data cleansing and transformation.")
public class Wrangler
extends Transform<StructuredRecord, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);
    private static final String APPLICATION_NAME = "dataprep";
    private static final String SERVICE_NAME = "service";
    private static final String CONFIG_METHOD = "config";
    private final Config config;
    private RecipePipeline pipeline;
    private Schema oSchema = null;
    private long errorCounter;
    private Precondition condition = null;
    private TransientStore store;
    private DirectiveRegistry registry;

    public Wrangler(Config config) {
        this.config = config;
    }

    public void configurePipeline(PipelineConfigurer configurer) throws IllegalArgumentException {
        super.configurePipeline(configurer);
        try {
            Schema iSchema = configurer.getStageConfigurer().getInputSchema();
            if (!this.config.containsMacro("field") && !this.config.field.equalsIgnoreCase("*")) {
                this.validateInputSchema(iSchema);
            }
            String directives = this.config.directives;
            if (this.config.udds != null && !this.config.udds.trim().isEmpty()) {
                directives = this.config.containsMacro("directives") ? String.format("#pragma load-directives %s;", this.config.udds) : String.format("#pragma load-directives %s;%s", this.config.udds, this.config.directives);
            }
            RecipeCompiler compiler = new RecipeCompiler();
            try {
                CompileStatus status = compiler.compile(new MigrateToV2(directives).migrate());
                RecipeSymbol symbols = status.getSymbols();
                Set dynamicDirectives = symbols.getLoadableDirectives();
                for (String directive : dynamicDirectives) {
                    Object o = configurer.usePlugin("directive", directive, directive, PluginProperties.builder().build());
                    if (o != null) continue;
                    throw new IllegalArgumentException(String.format("User Defined Directive '%s' is not deployed or is not available.", directive));
                }
                if (!this.config.containsMacro("directives")) {
                    this.registry = new SystemDirectiveRegistry();
                    if (symbols != null) {
                        Iterator iterator = symbols.iterator();
                        while (iterator != null && iterator.hasNext()) {
                            String directive;
                            DirectiveInfo directiveInfo;
                            TokenGroup group = (TokenGroup)iterator.next();
                            if (group == null || (directiveInfo = this.registry.get("", directive = (String)group.get(0).value())) != null || dynamicDirectives.contains(directive)) continue;
                            throw new IllegalArgumentException(String.format("Wrangler plugin has a directive '%s' that does not exist in system or user space. Either it is a typographical error or the user directive is not loaded.", directive));
                        }
                    }
                }
            }
            catch (CompileException e) {
                throw new IllegalArgumentException(e.getMessage(), e);
            }
            catch (DirectiveParseException e) {
                throw new IllegalArgumentException(e.getMessage(), e);
            }
            try {
                if (!this.config.containsMacro("schema")) {
                    this.oSchema = Schema.parseJson((String)this.config.schema);
                }
            }
            catch (IOException e) {
                throw new IllegalArgumentException("Format of output schema specified is invalid. Please check the format.", e);
            }
            Schema inputSchema = configurer.getStageConfigurer().getInputSchema();
            if (!(this.config.containsMacro("field") || this.config.field.equals("*") || this.config.field.equals("#") || inputSchema == null || inputSchema.getField(this.config.field) != null)) {
                throw new IllegalArgumentException(String.format("Field '%s' configured to wrangler is not present in the input. Only specify fields present in the input", this.config.field == null ? "null" : this.config.field));
            }
            if (!this.config.containsMacro("precondition") && this.config.precondition != null && !this.config.precondition.trim().isEmpty()) {
                try {
                    new Precondition(this.config.precondition);
                }
                catch (PreconditionException e) {
                    throw new IllegalArgumentException(e.getMessage(), e);
                }
            }
            if (this.oSchema != null) {
                configurer.getStageConfigurer().setOutputSchema(this.oSchema);
            }
        }
        catch (Exception e) {
            LOG.error(e.getMessage());
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }

    public void prepareRun(StageSubmitterContext context) throws Exception {
        Schema outputSchema;
        super.prepareRun(context);
        List<Object> inputFields = new ArrayList();
        List<Object> outputFields = new ArrayList();
        Schema inputSchema = context.getInputSchema();
        if (this.checkSchema(inputSchema, "input")) {
            inputFields = inputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList());
        }
        if (this.checkSchema(outputSchema = context.getOutputSchema(), "output")) {
            outputFields = outputSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList());
        }
        FieldTransformOperation dataPrepOperation = new FieldTransformOperation("Prepare Data", new MigrateToV2(this.config.directives).migrate(), inputFields, outputFields);
        context.record(Collections.singletonList(dataPrepOperation));
    }

    private boolean checkSchema(Schema schema, String name) {
        if (schema == null) {
            LOG.debug(String.format("The %s schema is null. Field level lineage will not be recorded", name));
            return false;
        }
        if (schema.getFields() == null) {
            LOG.debug(String.format("The %s schema fields are null. Field level lineage will not be recorded", name));
            return false;
        }
        return true;
    }

    private void validateInputSchema(Schema inputSchema) {
        Schema.Field inputSchemaField;
        if (inputSchema != null && (inputSchemaField = inputSchema.getField(this.config.field)) == null) {
            throw new IllegalArgumentException("Field " + this.config.field + " is not present in the input schema");
        }
    }

    public void initialize(TransformContext context) throws Exception {
        super.initialize(context);
        this.store = new DefaultTransientStore();
        this.registry = new CompositeDirectiveRegistry(new DirectiveRegistry[]{new SystemDirectiveRegistry(), new UserDirectiveRegistry((StageContext)context)});
        this.registry.reload(context.getNamespace());
        String directives = this.config.directives;
        if (this.config.udds != null && !this.config.udds.trim().isEmpty()) {
            directives = String.format("#pragma load-directives %s;%s", this.config.udds, this.config.directives);
        }
        GrammarBasedParser recipe = new GrammarBasedParser(context.getNamespace(), new MigrateToV2(directives).migrate(), this.registry);
        WranglerPipelineContext ctx = new WranglerPipelineContext(ExecutorContext.Environment.TRANSFORM, context, this.store);
        try {
            this.oSchema = Schema.parseJson((String)this.config.schema);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(String.format("Stage:%s - Format of output schema specified is invalid. Please check the format.", context.getStageName()), e);
        }
        if (this.config.precondition != null && !this.config.precondition.trim().isEmpty()) {
            try {
                this.condition = new Precondition(this.config.precondition);
            }
            catch (PreconditionException e) {
                throw new IllegalArgumentException(e.getMessage(), e);
            }
        }
        try {
            URL url = this.getDPServiceURL(CONFIG_METHOD);
            if (url != null) {
                ConfigDirectiveContext dContext = new ConfigDirectiveContext(url);
                recipe.initialize((DirectiveContext)dContext);
            } else {
                LOG.info(String.format("Stage:%s - The Dataprep service is not accessible in this environment. No aliasing and restriction will be applied.", this.getContext().getStageName()));
                recipe.initialize(null);
            }
        }
        catch (IOException | URISyntaxException e) {
            throw new IllegalArgumentException(String.format("Stage:%s - Issue in retrieving the configuration from the service. %s", this.getContext().getStageName(), e.getMessage()), e);
        }
        try {
            this.pipeline = new RecipePipelineExecutor();
            this.pipeline.initialize((RecipeParser)recipe, (ExecutorContext)ctx);
        }
        catch (Exception e) {
            throw new Exception(String.format("Stage:%s - %s", this.getContext().getStageName(), e.getMessage()), e);
        }
        this.errorCounter = 0L;
    }

    public void destroy() {
        super.destroy();
        this.pipeline.destroy();
        try {
            this.registry.close();
        }
        catch (IOException e) {
            LOG.warn("Unable to close the directive registry. You might see increasing number of open file handle.", (Object)e.getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
        List records;
        long start = 0L;
        try {
            boolean skip;
            Row row = new Row();
            if ("*".equalsIgnoreCase(this.config.field)) {
                for (Object field : input.getSchema().getFields()) {
                    row.add(field.getName(), this.getValue(input, field.getName()));
                }
            } else if ("#".equalsIgnoreCase(this.config.field)) {
                row.add(input.getSchema().getRecordName(), (Object)input);
            } else {
                row.add(this.config.field, this.getValue(input, this.config.field));
            }
            if (this.condition != null && (skip = this.condition.apply(row))) {
                this.getContext().getMetrics().count("precondition.filtered", 1);
                return;
            }
            this.store.reset(TransientVariableScope.GLOBAL);
            this.store.reset(TransientVariableScope.LOCAL);
            start = System.nanoTime();
            records = this.pipeline.execute(Arrays.asList(row), this.oSchema);
            List errors = this.pipeline.errors();
            if (errors.size() > 0) {
                this.getContext().getMetrics().count("errors", errors.size());
                for (ErrorRecord error : errors) {
                    emitter.emitError(new InvalidEntry(error.getCode(), error.getMessage(), (Object)input));
                }
            }
        }
        catch (Exception e) {
            this.getContext().getMetrics().count("failures", 1);
            ++this.errorCounter;
            if (this.config.threshold != -1 && this.errorCounter > (long)this.config.threshold) {
                emitter.emitAlert((Map)ImmutableMap.of((Object)"stage", (Object)this.getContext().getStageName(), (Object)"code", (Object)String.valueOf(1), (Object)"message", (Object)"Error threshold reached.", (Object)"value", (Object)String.valueOf(this.errorCounter)));
                if (e instanceof DirectiveExecutionException) {
                    throw new Exception(String.format("Stage:%s - Reached error threshold %d, terminating processing due to error : %s", this.getContext().getStageName(), this.config.threshold, e.getMessage()), e);
                }
                throw new Exception(String.format("Stage:%s - Reached error threshold %d, terminating processing due to error : %s", this.getContext().getStageName(), this.config.threshold, e.getMessage()), e);
            }
            emitter.emitError(new InvalidEntry(0, e.getMessage(), (Object)input));
            return;
        }
        finally {
            this.getContext().getMetrics().gauge("process.time", System.nanoTime() - start);
        }
        for (StructuredRecord record : records) {
            StructuredRecord.Builder builder = StructuredRecord.builder((Schema)this.oSchema);
            for (Schema.Field field : this.oSchema.getFields()) {
                Object wObject = record.get(field.getName());
                if (wObject == null) {
                    builder.set(field.getName(), null);
                    continue;
                }
                if (wObject instanceof String) {
                    builder.convertAndSet(field.getName(), (String)wObject);
                    continue;
                }
                builder.set(field.getName(), wObject);
            }
            emitter.emit((Object)builder.build());
        }
    }

    private Object getValue(StructuredRecord input, String fieldName) {
        Schema fieldSchema = input.getSchema().getField(fieldName).getSchema();
        fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
        Schema.LogicalType logicalType = fieldSchema.getLogicalType();
        if (logicalType != null) {
            switch (logicalType) {
                case DATE: {
                    return input.getDate(fieldName);
                }
                case TIME_MILLIS: 
                case TIME_MICROS: {
                    return input.getTime(fieldName);
                }
                case TIMESTAMP_MILLIS: 
                case TIMESTAMP_MICROS: {
                    return input.getTimestamp(fieldName);
                }
                case DECIMAL: {
                    return input.getDecimal(fieldName);
                }
            }
            throw new UnexpectedFormatException("Field type " + logicalType + " is not supported.");
        }
        return input.get(fieldName);
    }

    private URL getDPServiceURL(String method) throws URISyntaxException, MalformedURLException {
        URL url = this.getContext().getServiceURL(APPLICATION_NAME, SERVICE_NAME);
        if (url == null) {
            return null;
        }
        URI uri = url.toURI();
        String path = uri.getPath() + method;
        return uri.resolve(path).toURL();
    }

    public static class Config
    extends PluginConfig {
        @Name(value="precondition")
        @Description(value="Precondition expression specifying filtering before applying directives (true to filter)")
        @Macro
        private String precondition;
        @Name(value="directives")
        @Description(value="Recipe for wrangling the input records")
        @Macro
        @Nullable
        private String directives;
        @Name(value="udd")
        @Description(value="List of User Defined Directives (UDD) that have to be loaded.")
        @Nullable
        private String udds;
        @Name(value="field")
        @Description(value="Name of the input field to be wrangled or '*' to wrangle all the fields.")
        @Macro
        private final String field;
        @Name(value="threshold")
        @Description(value="Max number of event failures in wrangling after which to stop the pipeline of processing. Threshold is not aggregate across all instance, but is applied for each running instances. Set to -1 to specify unlimited number of acceptable errors.")
        @Macro
        private final int threshold;
        @Name(value="schema")
        @Description(value="Specifies the schema that has to be output.")
        @Macro
        private final String schema;

        public Config(String precondition, String directives, String udds, String field, int threshold, String schema) {
            this.precondition = precondition;
            this.directives = directives;
            this.udds = udds;
            this.field = field;
            this.threshold = threshold;
            this.schema = schema;
        }
    }
}

