/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.directives.parser;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.ErrorRowException;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.annotations.Categories;
import io.cdap.wrangler.api.lineage.Lineage;
import io.cdap.wrangler.api.lineage.Many;
import io.cdap.wrangler.api.lineage.Mutation;
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.Identifier;
import io.cdap.wrangler.api.parser.Numeric;
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.clients.RestClientException;
import io.cdap.wrangler.clients.SchemaRegistryClient;
import io.cdap.wrangler.codec.Decoder;
import io.cdap.wrangler.codec.DecoderException;
import io.cdap.wrangler.codec.ProtobufDecoderUsingDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Plugin(type="directive")
@Name(value="parse-as-protobuf")
@Categories(categories={"parser", "protobuf"})
@Description(value="Parses column as protobuf encoded memory representations.")
public class ParseProtobuf
implements Directive,
Lineage {
    public static final String NAME = "parse-as-protobuf";
    private static final Logger LOG = LoggerFactory.getLogger(ParseProtobuf.class);
    private String column;
    private String schemaId;
    private String recordName;
    private long version;
    private Decoder<Row> decoder;
    private boolean decoderInitialized = false;
    private SchemaRegistryClient client;

    public UsageDefinition define() {
        UsageDefinition.Builder builder = UsageDefinition.builder((String)NAME);
        builder.define("column", TokenType.COLUMN_NAME);
        builder.define("schema-id", TokenType.IDENTIFIER);
        builder.define("record-name", TokenType.TEXT);
        builder.define("version", TokenType.NUMERIC, true);
        return builder.build();
    }

    public void initialize(Arguments args) throws DirectiveParseException {
        this.column = ((ColumnName)args.value("column")).value();
        this.schemaId = ((Identifier)args.value("schema-id")).value();
        this.recordName = ((Text)args.value("record-name")).value();
        this.version = args.contains("version") ? (long)((Numeric)args.value("version")).value().intValue() : -1L;
    }

    public void destroy() {
    }

    public List<Row> execute(List<Row> rows, final ExecutorContext context) throws DirectiveExecutionException, ErrorRowException {
        ArrayList<Row> results = new ArrayList<Row>();
        if (!this.decoderInitialized) {
            Callable<Decoder<Row>> decoderCallable = new Callable<Decoder<Row>>(){

                @Override
                public Decoder<Row> call() throws Exception {
                    ParseProtobuf.this.client = SchemaRegistryClient.getInstance(context);
                    byte[] bytes = ParseProtobuf.this.version != -1L ? ParseProtobuf.this.client.getSchema(context.getNamespace(), ParseProtobuf.this.schemaId, ParseProtobuf.this.version) : ParseProtobuf.this.client.getSchema(context.getNamespace(), ParseProtobuf.this.schemaId);
                    return new ProtobufDecoderUsingDescriptor(bytes, ParseProtobuf.this.recordName);
                }
            };
            Retryer retryer = RetryerBuilder.newBuilder().retryIfExceptionOfType(IOException.class).retryIfExceptionOfType(RestClientException.class).withWaitStrategy(WaitStrategies.exponentialWait((long)10L, (TimeUnit)TimeUnit.SECONDS)).withStopStrategy(StopStrategies.stopAfterAttempt((int)5)).build();
            try {
                this.decoder = (Decoder)retryer.call((Callable)decoderCallable);
                if (this.decoder == null) {
                    throw new DirectiveExecutionException(NAME, "Unsupported protobuf decoder type.");
                }
                this.decoderInitialized = true;
            }
            catch (RetryException | ExecutionException e) {
                throw new DirectiveExecutionException(NAME, String.format("Unable to retrieve protobuf descriptor from schema registry. %s", e.getMessage()), e);
            }
        }
        try {
            for (Row row : rows) {
                int idx = row.find(this.column);
                if (idx == -1) continue;
                Object object = row.getValue(idx);
                if (object instanceof byte[]) {
                    byte[] bytes = (byte[])object;
                    results.addAll(this.decoder.decode(bytes));
                    continue;
                }
                throw new ErrorRowException(NAME, "Column " + this.column + " should be of type 'byte array'", 1);
            }
        }
        catch (DecoderException e) {
            throw new ErrorRowException(NAME, "Issue decoding Protobuf record. Check schema version '" + (this.version == -1L ? "latest" : Long.valueOf(this.version)) + "'. " + e.getMessage(), 2);
        }
        return results;
    }

    public Mutation lineage() {
        return Mutation.builder().readable("Parsed column '%s' as a protobuf message", new Object[]{this.column}).all(Many.columns((String[])new String[]{this.column})).build();
    }
}

