package io.eels.datastream;

import io.eels.Part;
import io.eels.Row;
import io.eels.Row$;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: DataStreamSource.scala */
/* loaded from: input_file:io/eels/datastream/DataStreamSource$$anonfun$subscribe$1.class */
public final class DataStreamSource$$anonfun$subscribe$1 extends AbstractFunction1<Tuple2<Part, Object>, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ DataStreamSource $outer;
    public final LinkedBlockingQueue queue$1;
    public final AtomicLong finished$1;
    public final Seq parts$1;
    public final AtomicBoolean running$1;

    public final void apply(Tuple2<Part, Object> tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        final Part part = (Part) tuple2._1();
        final int _2$mcI$sp = tuple2._2$mcI$sp();
        ExecutorInstances$.MODULE$.io().execute(new Runnable(this, part, _2$mcI$sp) { // from class: io.eels.datastream.DataStreamSource$$anonfun$subscribe$1$$anon$2
            private final /* synthetic */ DataStreamSource$$anonfun$subscribe$1 $outer;
            private final Part part$1;
            private final int k$1;

            @Override // java.lang.Runnable
            public void run() {
                try {
                    Part part2 = this.part$1;
                    final DataStreamSource io$eels$datastream$DataStreamSource$$anonfun$$$outer = this.$outer.io$eels$datastream$DataStreamSource$$anonfun$$$outer();
                    final String obj = BoxesRunTime.boxToInteger(this.k$1).toString();
                    final LinkedBlockingQueue linkedBlockingQueue = this.$outer.queue$1;
                    final AtomicLong atomicLong = this.$outer.finished$1;
                    final Seq seq = this.$outer.parts$1;
                    final AtomicBoolean atomicBoolean = this.$outer.running$1;
                    part2.subscribe(new Subscriber<Seq<Row>>(io$eels$datastream$DataStreamSource$$anonfun$$$outer, obj, linkedBlockingQueue, atomicLong, seq, atomicBoolean) { // from class: io.eels.datastream.DataStreamSource$PartSubscriber$1
                        private final String name;
                        private Cancellable cancellable;
                        public final /* synthetic */ DataStreamSource $outer;
                        private final LinkedBlockingQueue queue$1;
                        private final AtomicLong finished$1;
                        private final Seq parts$1;
                        private final AtomicBoolean running$1;

                        public Cancellable cancellable() {
                            return this.cancellable;
                        }

                        public void cancellable_$eq(Cancellable cancellable) {
                            this.cancellable = cancellable;
                        }

                        @Override // io.eels.datastream.Subscriber
                        public void starting(Cancellable cancellable) {
                            io$eels$datastream$DataStreamSource$PartSubscriber$$$outer().logger().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Starting reads for part ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.name})));
                            cancellable_$eq(cancellable);
                        }

                        @Override // io.eels.datastream.Subscriber
                        public void completed() {
                            io$eels$datastream$DataStreamSource$PartSubscriber$$$outer().logger().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Part ", " has finished"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.name})));
                            if (this.finished$1.incrementAndGet() == this.parts$1.size()) {
                                this.queue$1.put(Row$.MODULE$.Sentinel());
                            }
                        }

                        @Override // io.eels.datastream.Subscriber
                        public void error(Throwable th) {
                            io$eels$datastream$DataStreamSource$PartSubscriber$$$outer().logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error reading part ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.name})), th);
                            cancellable().cancel();
                            if (this.finished$1.incrementAndGet() == this.parts$1.size()) {
                                this.queue$1.put(Row$.MODULE$.Sentinel());
                            }
                        }

                        @Override // io.eels.datastream.Subscriber
                        public void next(Seq<Row> seq2) {
                            this.queue$1.put(seq2);
                            if (this.running$1.get()) {
                                return;
                            }
                            io$eels$datastream$DataStreamSource$PartSubscriber$$$outer().logger().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Cancelling part ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.name})), new Object[]{seq2});
                            cancellable().cancel();
                        }

                        public /* synthetic */ DataStreamSource io$eels$datastream$DataStreamSource$PartSubscriber$$$outer() {
                            return this.$outer;
                        }

                        {
                            this.name = obj;
                            if (io$eels$datastream$DataStreamSource$$anonfun$$$outer == null) {
                                throw null;
                            }
                            this.$outer = io$eels$datastream$DataStreamSource$$anonfun$$$outer;
                            this.queue$1 = linkedBlockingQueue;
                            this.finished$1 = atomicLong;
                            this.parts$1 = seq;
                            this.running$1 = atomicBoolean;
                            this.cancellable = null;
                        }
                    });
                } catch (Throwable th) {
                    this.$outer.io$eels$datastream$DataStreamSource$$anonfun$$$outer().logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error subscribing to part ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.k$1)})), th);
                    this.$outer.queue$1.put(Row$.MODULE$.Sentinel());
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.part$1 = part;
                this.k$1 = _2$mcI$sp;
            }
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public /* synthetic */ DataStreamSource io$eels$datastream$DataStreamSource$$anonfun$$$outer() {
        return this.$outer;
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((Tuple2<Part, Object>) obj);
        return BoxedUnit.UNIT;
    }

    public DataStreamSource$$anonfun$subscribe$1(DataStreamSource dataStreamSource, LinkedBlockingQueue linkedBlockingQueue, AtomicLong atomicLong, Seq seq, AtomicBoolean atomicBoolean) {
        if (dataStreamSource == null) {
            throw null;
        }
        this.$outer = dataStreamSource;
        this.queue$1 = linkedBlockingQueue;
        this.finished$1 = atomicLong;
        this.parts$1 = seq;
        this.running$1 = atomicBoolean;
    }
}
