/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.directives.parser;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Charsets;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.ErrorRowException;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.annotations.Categories;
import io.cdap.wrangler.api.lineage.Lineage;
import io.cdap.wrangler.api.lineage.Many;
import io.cdap.wrangler.api.lineage.Mutation;
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.Identifier;
import io.cdap.wrangler.api.parser.Numeric;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.clients.RestClientException;
import io.cdap.wrangler.clients.SchemaRegistryClient;
import io.cdap.wrangler.codec.BinaryAvroDecoder;
import io.cdap.wrangler.codec.Decoder;
import io.cdap.wrangler.codec.DecoderException;
import io.cdap.wrangler.codec.JsonAvroDecoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;

@Plugin(type="directive")
@Name(value="parse-as-avro")
@Categories(categories={"parser", "avro"})
@Description(value="Parses column as AVRO generic record.")
public class ParseAvro
implements Directive,
Lineage {
    public static final String NAME = "parse-as-avro";
    private String column;
    private String schemaId;
    private String type;
    private long version;
    private Decoder<Row> decoder;
    private boolean decoderInitialized = false;
    private SchemaRegistryClient client;

    public UsageDefinition define() {
        UsageDefinition.Builder builder = UsageDefinition.builder((String)NAME);
        builder.define("column", TokenType.COLUMN_NAME);
        builder.define("schema-id", TokenType.IDENTIFIER);
        builder.define("encode-type", TokenType.IDENTIFIER);
        builder.define("version", TokenType.NUMERIC, true);
        return builder.build();
    }

    public void initialize(Arguments args) throws DirectiveParseException {
        this.column = ((ColumnName)args.value("column")).value();
        this.schemaId = ((Identifier)args.value("schema-id")).value();
        this.type = ((Identifier)args.value("encode-type")).value();
        if (!"json".equalsIgnoreCase(this.type) && !"binary".equalsIgnoreCase(this.type)) {
            throw new DirectiveParseException(NAME, String.format("Invalid encoding type '%s'. The type must be either 'json' or 'binary'.", this.type));
        }
        this.version = args.contains("version") ? (long)((Numeric)args.value("version")).value().intValue() : -1L;
    }

    public void destroy() {
    }

    public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException, ErrorRowException {
        ArrayList<Row> results = new ArrayList<Row>();
        if (!this.decoderInitialized) {
            Callable<Decoder> decoderCallable = () -> {
                this.client = SchemaRegistryClient.getInstance(context);
                byte[] bytes = this.version != -1L ? this.client.getSchema(context.getNamespace(), this.schemaId, this.version) : this.client.getSchema(context.getNamespace(), this.schemaId);
                Schema.Parser parser = new Schema.Parser();
                Schema schema = parser.parse(Bytes.toString((byte[])bytes));
                if ("json".equalsIgnoreCase(this.type)) {
                    return new JsonAvroDecoder(schema);
                }
                if ("binary".equalsIgnoreCase(this.type)) {
                    return new BinaryAvroDecoder(schema);
                }
                return null;
            };
            Retryer retryer = RetryerBuilder.newBuilder().retryIfExceptionOfType(IOException.class).retryIfExceptionOfType(RestClientException.class).withWaitStrategy(WaitStrategies.exponentialWait((long)10L, (TimeUnit)TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt((int)5)).build();
            try {
                this.decoder = (Decoder)retryer.call((Callable)decoderCallable);
                if (this.decoder == null) {
                    throw new DirectiveExecutionException(NAME, "Avro parsing is supported for 'json' and 'binary' types only.");
                }
                this.decoderInitialized = true;
            }
            catch (RetryException | ExecutionException e) {
                throw new DirectiveExecutionException(NAME, String.format("Unable to retrieve schema from schema registry. %s", e.getMessage()), e);
            }
        }
        try {
            for (Row row : rows) {
                int idx = row.find(this.column);
                if (idx == -1) continue;
                Object object = row.getValue(idx);
                if (object instanceof byte[]) {
                    byte[] bytes = (byte[])object;
                    results.addAll(this.decoder.decode(bytes));
                    continue;
                }
                if (object instanceof String) {
                    String body = (String)object;
                    byte[] bytes = body.getBytes(Charsets.UTF_8);
                    results.addAll(this.decoder.decode(bytes));
                    continue;
                }
                throw new ErrorRowException(NAME, "Column " + this.column + " should be of type 'String' or 'byte array'.", 1);
            }
        }
        catch (DecoderException e) {
            throw new ErrorRowException(NAME, "Issue decoding Avro record. Check schema version '" + (this.version == -1L ? "latest" : Long.valueOf(this.version)) + "'. " + e.getMessage(), 2);
        }
        return results;
    }

    public Mutation lineage() {
        return Mutation.builder().readable("Parsed column '%s' as a Avro record", new Object[]{this.column}).all(Many.columns((String[])new String[]{this.column})).build();
    }
}

