package io.atleon.core;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/atleon/core/AcknowledgingPublisher.class */
public final class AcknowledgingPublisher<T> implements Publisher<Alo<T>> {
    private final Publisher<? extends T> source;
    private final Runnable acknowledger;
    private final Consumer<? super Throwable> nacknowledger;
    private final AloFactory<T> factory;
    private final AtomicBoolean subscribedOnce = new AtomicBoolean(false);

    /* loaded from: input_file:io/atleon/core/AcknowledgingPublisher$AcknowledgingSubscriber.class */
    private static final class AcknowledgingSubscriber<T> implements Subscriber<T> {
        private final AtomicReference<State> state = new AtomicReference<>(State.ACTIVE);
        private final Collection<Reference<T>> unacknowledged = Collections.newSetFromMap(new IdentityHashMap());
        private final Runnable acknowledger;
        private final Consumer<? super Throwable> nacknowledger;
        private final AloFactory<T> factory;
        private final Subscriber<? super Alo<T>> subscriber;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:io/atleon/core/AcknowledgingPublisher$AcknowledgingSubscriber$State.class */
        public enum State {
            ACTIVE,
            IN_FLIGHT,
            EXECUTED
        }

        public AcknowledgingSubscriber(Runnable runnable, Consumer<? super Throwable> consumer, AloFactory<T> aloFactory, Subscriber<? super Alo<T>> subscriber) {
            this.acknowledger = runnable;
            this.nacknowledger = consumer;
            this.factory = aloFactory;
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            Subscriber<? super Alo<T>> subscriber = this.subscriber;
            Objects.requireNonNull(subscription);
            subscriber.onSubscribe(new ComposedSubscription((v1) -> {
                r3.request(v1);
            }, decorateCancellation(subscription)));
        }

        public void onNext(T t) {
            WeakReference weakReference = new WeakReference(Objects.requireNonNull(t, "Empty value emitted - Adhering to ReactiveStreams rule 2.13"));
            synchronized (this.unacknowledged) {
                if (this.state.get() == State.ACTIVE) {
                    this.unacknowledged.add(weakReference);
                }
            }
            this.subscriber.onNext(wrap(t, weakReference));
        }

        public void onError(Throwable th) {
            maybeExecuteNacknowledger(th);
            this.subscriber.onError(th);
        }

        public void onComplete() {
            if (this.state.compareAndSet(State.ACTIVE, State.IN_FLIGHT)) {
                maybeExecuteAcknowledger();
            }
            this.subscriber.onComplete();
        }

        private Runnable decorateCancellation(Subscription subscription) {
            return () -> {
                subscription.cancel();
                if (this.state.compareAndSet(State.ACTIVE, State.IN_FLIGHT)) {
                    maybeExecuteAcknowledger();
                }
            };
        }

        private Alo<T> wrap(T t, Reference<T> reference) {
            return this.factory.create(t, () -> {
                synchronized (this.unacknowledged) {
                    if (this.unacknowledged.remove(reference)) {
                        maybeExecuteAcknowledger();
                    }
                }
            }, th -> {
                synchronized (this.unacknowledged) {
                    if (this.unacknowledged.contains(reference)) {
                        maybeExecuteNacknowledger(th);
                    }
                }
            });
        }

        private void maybeExecuteAcknowledger() {
            synchronized (this.unacknowledged) {
                if (this.unacknowledged.isEmpty() && this.state.compareAndSet(State.IN_FLIGHT, State.EXECUTED)) {
                    this.acknowledger.run();
                }
            }
        }

        private void maybeExecuteNacknowledger(Throwable th) {
            synchronized (this.unacknowledged) {
                if (this.state.compareAndSet(State.ACTIVE, State.EXECUTED) || this.state.compareAndSet(State.IN_FLIGHT, State.EXECUTED)) {
                    this.unacknowledged.clear();
                    this.nacknowledger.accept(th);
                }
            }
        }
    }

    private AcknowledgingPublisher(Publisher<? extends T> publisher, Runnable runnable, Consumer<? super Throwable> consumer, AloFactory<T> aloFactory) {
        this.source = publisher;
        this.acknowledger = runnable;
        this.nacknowledger = consumer;
        this.factory = aloFactory;
    }

    public static <T> Publisher<Alo<T>> fromAloPublisher(Alo<Publisher<T>> alo) {
        return new AcknowledgingPublisher(alo.get(), alo.getAcknowledger(), alo.getNacknowledger(), alo.propagator());
    }

    public void subscribe(Subscriber<? super Alo<T>> subscriber) {
        if (!this.subscribedOnce.compareAndSet(false, true)) {
            throw new IllegalStateException("AcknowledgingPublisher may only be subscribed to once");
        }
        this.source.subscribe(new AcknowledgingSubscriber(this.acknowledger, this.nacknowledger, this.factory, subscriber));
    }
}
