package org.apache.streams.pig;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import datafu.pig.util.AliasableEvalFunc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.ArrayUtils;
import org.apache.pig.builtin.MonitoredUDF;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@MonitoredUDF(timeUnit = TimeUnit.SECONDS, duration = 30, intDefault = {10})
/* loaded from: input_file:org/apache/streams/pig/StreamsProcessDatumExec.class */
public class StreamsProcessDatumExec extends AliasableEvalFunc<DataBag> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamsProcessDatumExec.class);
    StreamsProcessor streamsProcessor;
    TupleFactory mTupleFactory = TupleFactory.getInstance();
    BagFactory mBagFactory = BagFactory.getInstance();
    ObjectMapper mapper = StreamsJacksonMapper.getInstance();

    public StreamsProcessDatumExec(String... strArr) throws ClassNotFoundException {
        Preconditions.checkNotNull(strArr);
        Preconditions.checkArgument(strArr.length > 0);
        String str = strArr[0];
        Preconditions.checkNotNull(str);
        String[] strArr2 = (String[]) ArrayUtils.remove(strArr, 0);
        this.streamsProcessor = StreamsComponentFactory.getProcessorInstance(Class.forName(str));
        if (strArr.length == 1) {
            LOGGER.debug("prepare (null)");
            this.streamsProcessor.prepare((Object) null);
        } else if (strArr.length > 1) {
            LOGGER.debug("prepare " + Arrays.toString(strArr2));
            this.streamsProcessor.prepare(strArr2);
        }
    }

    /* renamed from: exec, reason: merged with bridge method [inline-methods] */
    public DataBag m2exec(Tuple tuple) throws IOException {
        Long valueOf;
        if (tuple == null || tuple.size() == 0) {
            return null;
        }
        BagFactory.getInstance().newDefaultBag();
        UDFContext.getUDFContext().getJobConf();
        String str = (String) tuple.get(0);
        String str2 = (String) tuple.get(1);
        try {
            valueOf = (Long) tuple.get(2);
        } catch (Exception e) {
            valueOf = Long.valueOf(RFC3339Utils.parseUTC((String) tuple.get(2)).getMillis());
        }
        List<StreamsDatum> process = this.streamsProcessor.process(new StreamsDatum((String) tuple.get(3), str, new DateTime(valueOf)));
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamsDatum streamsDatum : process) {
            Tuple newTuple = this.mTupleFactory.newTuple();
            newTuple.append(str);
            newTuple.append(str2);
            newTuple.append(valueOf);
            if (streamsDatum.getDocument() instanceof String) {
                newTuple.append(streamsDatum.getDocument());
            } else {
                newTuple.append(this.mapper.writeValueAsString(streamsDatum.getDocument()));
            }
            newArrayList.add(newTuple);
        }
        return this.mBagFactory.newDefaultBag(newArrayList);
    }

    public void finish() {
        this.streamsProcessor.cleanUp();
    }

    public Schema getOutputSchema(Schema schema) {
        if (schema.size() != 4) {
            throw new RuntimeException("Expected: id\tsource\ttimestamp\tobject");
        }
        try {
            if (schema.getField(0).type != 55 && schema.getField(0).type != 15) {
                throw new RuntimeException(((("Expected: id\tsource\ttimestamp\tobjectProblem with id: must be CHARARRAY or LONG") + "\t(") + DataType.findTypeName(schema.getField(0).type)) + ")\n");
            }
            if (schema.getField(1).type != 55) {
                throw new RuntimeException(((("Expected: id\tsource\ttimestamp\tobjectProblem with source: must be CHARARRAY") + "\t(") + DataType.findTypeName(schema.getField(1).type)) + ")\n");
            }
            if (schema.getField(2).type != 55 && schema.getField(2).type != 15) {
                throw new RuntimeException(((("Expected: id\tsource\ttimestamp\tobjectProblem with timestamp: must be CHARARRAY or LONG") + "\t(") + DataType.findTypeName(schema.getField(2).type)) + ")\n");
            }
            if (schema.getField(3).type == 55) {
                return schema;
            }
            throw new RuntimeException(((("Expected: id\tsource\ttimestamp\tobjectProblem with object: must be CHARARRAY") + "\t(") + DataType.findTypeName(schema.getField(3).type)) + ")\n");
        } catch (Exception e) {
            throw new RuntimeException("Expected: id\tsource\ttimestamp\tobject");
        }
    }
}
