package io.projectglow.transformers.pipe;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import io.projectglow.common.GlowLogging;
import io.projectglow.transformers.pipe.PipeIterator;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkException;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLUtils$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.util.CollectionAccumulator;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: Piper.scala */
/* loaded from: input_file:io/projectglow/transformers/pipe/Piper$.class */
public final class Piper$ implements GlowLogging {
    public static Piper$ MODULE$;
    private final ListBuffer<RDD<?>> cachedRdds;
    private Logger logger;
    private volatile boolean bitmap$0;

    static {
        new Piper$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [io.projectglow.transformers.pipe.Piper$] */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.logger$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    public Logger logger() {
        return !this.bitmap$0 ? logger$lzycompute() : this.logger;
    }

    private ListBuffer<RDD<?>> cachedRdds() {
        return this.cachedRdds;
    }

    public void clearCache() {
        synchronized (cachedRdds()) {
            Some activeSession = SparkSession$.MODULE$.getActiveSession();
            if (None$.MODULE$.equals(activeSession)) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!(activeSession instanceof Some)) {
                    throw new MatchError(activeSession);
                }
                SparkSession sparkSession = (SparkSession) activeSession.value();
                cachedRdds().foreach(rdd -> {
                    SparkContext sparkContext = rdd.sparkContext();
                    SparkContext sparkContext2 = sparkSession.sparkContext();
                    return (sparkContext != null ? !sparkContext.equals(sparkContext2) : sparkContext2 != null) ? BoxedUnit.UNIT : rdd.unpersist(rdd.unpersist$default$1());
                });
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            cachedRdds().clear();
        }
    }

    public Dataset<Row> pipe(InputFormatter<?> inputFormatter, OutputFormatter outputFormatter, Seq<String> seq, Map<String, String> map, Dataset<Row> dataset, Option<Tuple2<String, String>> option) {
        RDD<InternalRow> rdd;
        Some some;
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Beginning pipe with cmd {}", new Object[]{seq});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Option map2 = option.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple2 = new Tuple2((String) tuple2._1(), (String) tuple2._2());
            return new PipeIterator.QuarantineInfo(dataset, (String) tuple2._1(), PipeIterator$QuarantineWriter$.MODULE$.apply((String) tuple2._2()));
        });
        RDD<InternalRow> rdd2 = dataset.queryExecution().toRdd();
        if (rdd2.getNumPartitions() == 0) {
            if (logger().underlying().isWarnEnabled()) {
                logger().underlying().warn("Not piping any rows, as the input DataFrame has zero partitions.");
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            rdd = SQLUtils$.MODULE$.createEmptyRDD(dataset.sparkSession(), SQLUtils$.MODULE$.createEmptyRDD$default$2());
        } else {
            rdd = rdd2;
        }
        RDD<InternalRow> rdd3 = rdd;
        Success apply = Try$.MODULE$.apply(() -> {
            return rdd3.context().collectionAccumulator("errorPartitionData");
        });
        if (apply instanceof Success) {
            some = new Some((CollectionAccumulator) apply.value());
        } else {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            ((Failure) apply).exception().printStackTrace();
            some = None$.MODULE$;
        }
        Some some2 = some;
        RDD persist = rdd3.mapPartitions(iterator -> {
            return iterator.isEmpty() ? package$.MODULE$.Iterator().empty() : new PipeIterator(seq, map, iterator, inputFormatter, outputFormatter, some2);
        }, rdd3.mapPartitions$default$2(), ClassTag$.MODULE$.Any()).persist(StorageLevel$.MODULE$.DISK_ONLY());
        ListBuffer<RDD<?>> cachedRdds = cachedRdds();
        synchronized (cachedRdds) {
            cachedRdds().append(Predef$.MODULE$.wrapRefArray(new RDD[]{persist}));
        }
        StructType[] structTypeArr = (StructType[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) persist.mapPartitions(iterator2 -> {
            return iterator2.hasNext() ? package$.MODULE$.Iterator().apply(Predef$.MODULE$.wrapRefArray(new StructType[]{(StructType) iterator2.next()})) : package$.MODULE$.Iterator().empty();
        }, persist.mapPartitions$default$2(), ClassTag$.MODULE$.apply(StructType.class)).collect())).distinct();
        Some flatMap = some2.flatMap(collectionAccumulator -> {
            Buffer buffer = (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(collectionAccumulator.value()).asScala();
            return buffer.nonEmpty() ? new Some(buffer) : None$.MODULE$;
        });
        if (None$.MODULE$.equals(flatMap)) {
            if (structTypeArr.length != 1) {
                throw new IllegalStateException(new StringBuilder(43).append("Cannot infer schema: saw ").append(structTypeArr.length).append(" distinct schemas.").toString());
            }
            return SQLUtils$.MODULE$.internalCreateDataFrame(dataset.sparkSession(), persist.mapPartitions(iterator3 -> {
                return iterator3.drop(1);
            }, persist.mapPartitions$default$2(), ClassTag$.MODULE$.apply(InternalRow.class)), (StructType) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(structTypeArr)).head(), false);
        }
        if (!(flatMap instanceof Some)) {
            throw new MatchError(flatMap);
        }
        Tuple2 partition = ((Buffer) flatMap.value()).partition(obj -> {
            return BoxesRunTime.boxToBoolean($anonfun$pipe$7(obj));
        });
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple22 = new Tuple2((Buffer) partition._1(), (Buffer) partition._2());
        Buffer buffer = (Buffer) tuple22._1();
        Buffer buffer2 = (Buffer) tuple22._2();
        map2.foreach(quarantineInfo -> {
            $anonfun$pipe$8(buffer, persist, dataset, quarantineInfo);
            return BoxedUnit.UNIT;
        });
        Throwable th = (Throwable) buffer2.head();
        throw new SparkException(new StringBuilder(24).append("Could not process data. ").append(th.getMessage()).toString(), th);
    }

    public Option<Tuple2<String, String>> pipe$default$6() {
        return None$.MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$pipe$7(Object obj) {
        return obj instanceof InternalRow;
    }

    public static final /* synthetic */ void $anonfun$pipe$8(Buffer buffer, RDD rdd, Dataset dataset, PipeIterator.QuarantineInfo quarantineInfo) {
        Buffer buffer2 = (Buffer) buffer.map(obj -> {
            return (InternalRow) obj;
        }, Buffer$.MODULE$.canBuildFrom());
        SparkContext context = rdd.context();
        quarantineInfo.flavor().quarantine(quarantineInfo.copy(SQLUtils$.MODULE$.internalCreateDataFrame(dataset.sparkSession(), context.parallelize(buffer2, context.parallelize$default$2(), ClassTag$.MODULE$.apply(InternalRow.class)), dataset.schema(), false), quarantineInfo.copy$default$2(), quarantineInfo.copy$default$3()));
    }

    private Piper$() {
        MODULE$ = this;
        LazyLogging.$init$(this);
        this.cachedRdds = ListBuffer$.MODULE$.apply(Nil$.MODULE$);
    }
}
