package io.eels.datastream;

import com.sksamuel.exts.collection.BlockingQueueConcurrentIterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.package$;

/* JADX INFO: Add missing generic type declarations: [T] */
/* compiled from: Publisher.scala */
/* loaded from: input_file:io/eels/datastream/Publisher$$anon$1.class */
public final class Publisher$$anon$1<T> implements Publisher<T> {
    private final Seq publishers$1;
    public final Object sentinel$1;
    public final ExecutorService executor$1;

    @Override // io.eels.datastream.Publisher
    public void subscribe(Subscriber<T> subscriber) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(DataStream$.MODULE$.DefaultBufferSize());
        AtomicInteger atomicInteger = new AtomicInteger(this.publishers$1.size());
        final ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
        Subscription subscription = new Subscription(this, empty) { // from class: io.eels.datastream.Publisher$$anon$1$$anon$2
            private final ArrayBuffer subscriptions$1;

            @Override // io.eels.datastream.Subscription
            public void cancel() {
                this.subscriptions$1.foreach(new Publisher$$anon$1$$anon$2$$anonfun$cancel$1(this));
            }

            {
                this.subscriptions$1 = empty;
            }
        };
        AtomicReference atomicReference = new AtomicReference(null);
        this.publishers$1.foreach(new Publisher$$anon$1$$anonfun$subscribe$1(this, linkedBlockingQueue, atomicInteger, empty, subscription, atomicReference));
        try {
            subscriber.subscribed(subscription);
            new BlockingQueueConcurrentIterator(linkedBlockingQueue, package$.MODULE$.Right().apply(this.sentinel$1)).takeWhile(new Publisher$$anon$1$$anonfun$subscribe$2(this, atomicReference)).foreach(new Publisher$$anon$1$$anonfun$subscribe$3(this, subscriber));
            if (atomicReference.get() == null) {
                subscriber.completed();
            }
        } catch (Throwable th) {
            Publisher$.MODULE$.logger().error("Error in merge subscriber", th);
            subscription.cancel();
            subscriber.error(th);
        }
        Publisher$.MODULE$.logger().debug("Merge subscriber has completed");
    }

    public final void io$eels$datastream$Publisher$$anon$$terminate$1(Throwable th, LinkedBlockingQueue linkedBlockingQueue, Subscription subscription, AtomicReference atomicReference) {
        Publisher$.MODULE$.logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error in merge"})).s(Nil$.MODULE$), th);
        atomicReference.set(th);
        subscription.cancel();
        linkedBlockingQueue.clear();
        linkedBlockingQueue.put(package$.MODULE$.Right().apply(this.sentinel$1));
    }

    public Publisher$$anon$1(Seq seq, Object obj, ExecutorService executorService) {
        this.publishers$1 = seq;
        this.sentinel$1 = obj;
        this.executor$1 = executorService;
    }
}
