/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.counter.impl;

import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import org.infinispan.client.hotrod.counter.impl.CounterOperationFactory;
import org.infinispan.client.hotrod.counter.impl.HotRodCounterEvent;
import org.infinispan.client.hotrod.counter.operation.AddListenerOperation;
import org.infinispan.client.hotrod.counter.operation.RemoveListenerOperation;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.event.impl.CounterEventDispatcher;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.util.concurrent.NonReentrantLock;
import org.infinispan.counter.api.CounterListener;
import org.infinispan.counter.api.Handle;

public class NotificationManager {
    private static final Log log = LogFactory.getLog(NotificationManager.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final CompletableFuture<Short> NO_ERROR_FUTURE = CompletableFuture.completedFuture((short)0);
    private final byte[] listenerId;
    private final ClientListenerNotifier notifier;
    private final CounterOperationFactory factory;
    private final ConcurrentMap<String, List<Consumer<HotRodCounterEvent>>> clientListeners = new ConcurrentHashMap<String, List<Consumer<HotRodCounterEvent>>>();
    private final Lock lock = new NonReentrantLock();
    private volatile CounterEventDispatcher dispatcher;

    NotificationManager(ClientListenerNotifier notifier, CounterOperationFactory factory) {
        this.notifier = notifier;
        this.factory = factory;
        this.listenerId = new byte[16];
        ThreadLocalRandom.current().nextBytes(this.listenerId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends CounterListener> Handle<T> addListener(String counterName, T listener) {
        CounterEventDispatcher dispatcher;
        if (trace) {
            log.tracef("Add listener for counter '%s'", (Object)counterName);
        }
        if ((dispatcher = this.dispatcher) != null) {
            return this.registerListener(counterName, listener, dispatcher.address());
        }
        log.debugf("ALock %s", (Object)this.lock);
        this.lock.lock();
        try {
            dispatcher = this.dispatcher;
            Handle<T> handle = this.registerListener(counterName, listener, dispatcher == null ? null : dispatcher.address());
            return handle;
        }
        finally {
            this.lock.unlock();
            log.debugf("AUnLock %s", (Object)this.lock);
        }
    }

    private <T extends CounterListener> Handle<T> registerListener(String counterName, T listener, SocketAddress address) {
        HandleImpl handle = new HandleImpl(this, counterName, listener);
        this.clientListeners.computeIfAbsent(counterName, name -> {
            AddListenerOperation op = this.factory.newAddListenerOperation(counterName, this.listenerId, address);
            if (((Boolean)Util.await(op.execute())).booleanValue() && address == null) {
                this.dispatcher = new CounterEventDispatcher(this.listenerId, this.clientListeners, op.getChannel().remoteAddress(), this::failover, op::cleanup);
                this.notifier.addDispatcher(this.dispatcher);
                this.notifier.startClientListener(this.listenerId);
            }
            return new CopyOnWriteArrayList();
        }).add(handle);
        return handle;
    }

    private void removeListener(String counterName, HandleImpl<?> handle) {
        if (trace) {
            log.tracef("Remove listener for counter '%s'", (Object)counterName);
        }
        this.clientListeners.computeIfPresent(counterName, (name, list) -> {
            list.remove(handle);
            if (list.isEmpty()) {
                RemoveListenerOperation op;
                if (this.dispatcher != null && !((Boolean)Util.await((op = this.factory.newRemoveListenerOperation(counterName, this.listenerId, this.dispatcher.address())).execute())).booleanValue()) {
                    log.debugf("Failed to remove counter listener %s on server side", (Object)counterName);
                }
                return null;
            }
            return list;
        });
    }

    private CompletableFuture<Short> failover() {
        this.dispatcher = null;
        Iterator iterator = this.clientListeners.keySet().iterator();
        if (!iterator.hasNext()) {
            return NO_ERROR_FUTURE;
        }
        CompletableFuture<Short> cf = new CompletableFuture<Short>();
        String firstCounterName = (String)iterator.next();
        AddListenerOperation op = this.factory.newAddListenerOperation(firstCounterName, this.listenerId, null);
        log.debugf("Lock %s", (Object)this.lock);
        this.lock.lock();
        if (this.dispatcher == null) {
            op.execute().whenComplete((useChannel, throwable) -> {
                if (throwable != null) {
                    this.lock.unlock();
                    log.debugf((Throwable)throwable, "Failed to failover counter listener %s", (Object)firstCounterName);
                    cf.completeExceptionally((Throwable)throwable);
                } else {
                    SocketAddress address;
                    AtomicInteger counter = new AtomicInteger(1);
                    try {
                        if (useChannel.booleanValue()) {
                            log.debugf("Creating new counter event dispatcher on %s", (Object)op.getChannel());
                            this.dispatcher = new CounterEventDispatcher(this.listenerId, this.clientListeners, op.getChannel().remoteAddress(), this::failover, op::cleanup);
                            this.notifier.addDispatcher(this.dispatcher);
                            this.notifier.startClientListener(this.listenerId);
                        }
                        address = this.dispatcher.address();
                    }
                    catch (Throwable t) {
                        cf.completeExceptionally(t);
                        return;
                    }
                    finally {
                        this.lock.unlock();
                        log.debugf("UnLock %s", (Object)this.lock);
                    }
                    while (iterator.hasNext()) {
                        String counterName = (String)iterator.next();
                        this.factory.newAddListenerOperation(counterName, this.listenerId, address).execute().whenComplete((useChannel2, throwable2) -> {
                            if (throwable2 != null) {
                                log.debugf((Throwable)throwable2, "Failed to failover counter listener %s", (Object)counterName);
                                cf.completeExceptionally((Throwable)throwable2);
                            } else {
                                if (useChannel2.booleanValue()) {
                                    cf.completeExceptionally(new IllegalStateException("Unexpected to use another channel for the same counter"));
                                }
                                if (counter.decrementAndGet() == 0) {
                                    cf.complete((short)0);
                                }
                            }
                        });
                    }
                    if (counter.decrementAndGet() == 0) {
                        cf.complete((short)0);
                    }
                }
            });
            return cf;
        }
        this.lock.unlock();
        log.debugf("UnLock %s", (Object)this.lock);
        return NO_ERROR_FUTURE;
    }

    public void stop() {
        log.debugf("Stopping %s (%s)", (Object)this, (Object)this.lock);
        this.lock.lock();
        try {
            CompletableFuture[] futures = (CompletableFuture[])this.clientListeners.keySet().stream().map(counterName -> this.factory.newRemoveListenerOperation((String)counterName, this.listenerId, this.dispatcher.address()).execute()).toArray(CompletableFuture[]::new);
            Util.await(CompletableFuture.allOf(futures));
            this.clientListeners.clear();
        }
        finally {
            this.lock.unlock();
        }
    }

    private static class HandleImpl<T extends CounterListener>
    implements Handle<T>,
    Consumer<HotRodCounterEvent> {
        private final T listener;
        private final String counterName;
        final /* synthetic */ NotificationManager this$0;

        private HandleImpl(String counterName, T listener) {
            this.this$0 = var1_1;
            this.counterName = counterName;
            this.listener = listener;
        }

        @Override
        public T getCounterListener() {
            return this.listener;
        }

        @Override
        public void remove() {
            this.this$0.removeListener(this.counterName, this);
        }

        @Override
        public void accept(HotRodCounterEvent event) {
            try {
                this.listener.onUpdate(event);
            }
            catch (Throwable t) {
                log.debug("Exception in user listener", t);
            }
        }
    }
}

