/*
 * Decompiled with CFR 0.152.
 */
package org.microbean.kubernetes.controller;

import io.fabric8.kubernetes.api.model.HasMetadata;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.Immutable;
import net.jcip.annotations.ThreadSafe;
import org.microbean.kubernetes.controller.AbstractEvent;
import org.microbean.kubernetes.controller.Event;
import org.microbean.kubernetes.controller.ResourceTrackingEventQueueConsumer;
import org.microbean.kubernetes.controller.SynchronizationEvent;

@Immutable
@ThreadSafe
public final class EventDistributor<T extends HasMetadata>
extends ResourceTrackingEventQueueConsumer<T>
implements AutoCloseable {
    @GuardedBy(value="readLock && writeLock")
    private final Collection<Pump<T>> pumps;
    @GuardedBy(value="readLock && writeLock")
    private final Collection<Pump<T>> synchronizingPumps;
    private final Duration synchronizationInterval;
    private final Lock readLock;
    private final Lock writeLock;

    public EventDistributor(Map<Object, T> knownObjects) {
        this(knownObjects, null);
    }

    public EventDistributor(Map<Object, T> knownObjects, Duration synchronizationInterval) {
        super(knownObjects);
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        this.readLock = lock.readLock();
        this.writeLock = lock.writeLock();
        this.pumps = new ArrayList<Pump<T>>();
        this.synchronizingPumps = new ArrayList<Pump<T>>();
        this.synchronizationInterval = synchronizationInterval;
    }

    public final void addConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        this.addConsumer(consumer, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void addConsumer(Consumer<? super AbstractEvent<? extends T>> consumer, Function<? super Throwable, Boolean> errorHandler) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Pump pump = new Pump(this.synchronizationInterval, consumer, errorHandler);
                pump.start();
                this.pumps.add(pump);
                this.synchronizingPumps.add(pump);
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Iterator<Pump<T>> iterator = this.pumps.iterator();
                assert (iterator != null);
                while (iterator.hasNext()) {
                    Pump<T> pump = iterator.next();
                    if (pump == null || !consumer.equals(((Pump)pump).getEventConsumer())) continue;
                    pump.close();
                    iterator.remove();
                    break;
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    @Override
    public final void close() {
        this.writeLock.lock();
        try {
            this.pumps.stream().forEach(pump -> pump.close());
            this.synchronizingPumps.clear();
            this.pumps.clear();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public final boolean shouldSynchronize() {
        boolean returnValue = false;
        this.writeLock.lock();
        try {
            this.synchronizingPumps.clear();
            Instant now = Instant.now();
            this.pumps.stream().filter(pump -> ((Pump)pump).shouldSynchronize(now)).forEach(pump -> {
                this.synchronizingPumps.add((Pump<T>)pump);
                ((Pump)pump).determineNextSynchronizationInterval(now);
            });
            returnValue = !this.synchronizingPumps.isEmpty();
        }
        finally {
            this.writeLock.unlock();
        }
        return returnValue;
    }

    @Override
    protected final void accept(AbstractEvent<? extends T> event) {
        if (event != null) {
            if (event instanceof SynchronizationEvent) {
                this.accept((SynchronizationEvent)event);
            } else if (event instanceof Event) {
                this.accept((Event)event);
            } else assert (false) : "Unexpected event type: " + event.getClass();
        }
    }

    @Override
    private final void accept(SynchronizationEvent<? extends T> event) {
        this.readLock.lock();
        try {
            if (!this.synchronizingPumps.isEmpty()) {
                this.synchronizingPumps.stream().forEach(pump -> pump.accept(event));
            }
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    private final void accept(Event<? extends T> event) {
        this.readLock.lock();
        try {
            if (!this.pumps.isEmpty()) {
                this.pumps.stream().forEach(pump -> pump.accept(event));
            }
        }
        finally {
            this.readLock.unlock();
        }
    }

    private static final class Pump<T extends HasMetadata>
    implements Consumer<AbstractEvent<? extends T>>,
    AutoCloseable {
        private final Logger logger;
        private final Consumer<? super AbstractEvent<? extends T>> eventConsumer;
        private final Function<? super Throwable, Boolean> errorHandler;
        private volatile boolean closing;
        private volatile Instant nextSynchronizationInstant;
        private volatile Duration synchronizationInterval;
        @GuardedBy(value="this")
        private ScheduledExecutorService executor;
        @GuardedBy(value="this")
        private Future<?> task;
        private volatile Future<?> errorHandlingTask;
        final BlockingQueue<AbstractEvent<? extends T>> queue;

        private Pump(Duration synchronizationInterval, Consumer<? super AbstractEvent<? extends T>> eventConsumer) {
            this(synchronizationInterval, eventConsumer, null);
        }

        private Pump(Duration synchronizationInterval, Consumer<? super AbstractEvent<? extends T>> eventConsumer, Function<? super Throwable, Boolean> errorHandler) {
            String cn = this.getClass().getName();
            this.logger = Logger.getLogger(cn);
            assert (this.logger != null);
            String mn = "<init>";
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "<init>", new Object[]{synchronizationInterval, eventConsumer, errorHandler});
            }
            this.queue = new LinkedBlockingQueue<AbstractEvent<? extends T>>();
            this.eventConsumer = Objects.requireNonNull(eventConsumer);
            this.errorHandler = errorHandler == null ? t -> {
                if (this.logger.isLoggable(Level.SEVERE)) {
                    this.logger.logp(Level.SEVERE, this.getClass().getName(), "<pumpTask>", t.getMessage(), (Throwable)t);
                }
                return true;
            } : errorHandler;
            this.setSynchronizationInterval(synchronizationInterval);
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(cn, "<init>");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private final void start() {
            String cn = this.getClass().getName();
            String mn = "start";
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "start");
            }
            Pump pump = this;
            synchronized (pump) {
                if (this.executor == null) {
                    assert (this.task == null);
                    assert (this.errorHandlingTask == null);
                    this.executor = this.createScheduledThreadPoolExecutor();
                    if (this.executor == null) {
                        throw new IllegalStateException("createScheduledThreadPoolExecutor() == null");
                    }
                    this.task = this.executor.scheduleWithFixedDelay(() -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            try {
                                this.getEventConsumer().accept(this.queue.take());
                            }
                            catch (InterruptedException interruptedException) {
                                Thread.currentThread().interrupt();
                            }
                            catch (RuntimeException runtimeException) {
                                if (this.errorHandler.apply(runtimeException).booleanValue()) continue;
                                throw runtimeException;
                            }
                            catch (Error error) {
                                if (this.errorHandler.apply(error).booleanValue()) continue;
                                throw error;
                            }
                        }
                    }, 0L, 1L, TimeUnit.SECONDS);
                    assert (this.task != null);
                    this.errorHandlingTask = this.executor.submit(() -> {
                        try {
                            while (!Thread.currentThread().isInterrupted()) {
                                this.task.get();
                            }
                        }
                        catch (CancellationException ok) {
                            this.task.cancel(true);
                        }
                        catch (ExecutionException executionException) {
                            this.task.cancel(true);
                            Future<?> errorHandlingTask = this.errorHandlingTask;
                            if (errorHandlingTask != null) {
                                errorHandlingTask.cancel(true);
                            }
                            this.errorHandler.apply(executionException.getCause());
                        }
                        catch (InterruptedException interruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        if (Thread.currentThread().isInterrupted()) {
                            this.task.cancel(true);
                            Future<?> errorHandlingTask = this.errorHandlingTask;
                            if (errorHandlingTask != null) {
                                errorHandlingTask.cancel(true);
                            }
                        }
                    });
                }
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "start");
            }
        }

        private final ScheduledExecutorService createScheduledThreadPoolExecutor() {
            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new PumpThreadFactory());
            executor.setRemoveOnCancelPolicy(true);
            return executor;
        }

        private final Consumer<? super AbstractEvent<? extends T>> getEventConsumer() {
            return this.eventConsumer;
        }

        @Override
        public final void accept(AbstractEvent<? extends T> event) {
            String cn = this.getClass().getName();
            String mn = "accept";
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "accept", event);
            }
            if (this.closing) {
                throw new IllegalStateException();
            }
            if (event != null) {
                boolean added = this.queue.add(event);
                assert (added);
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(cn, "accept");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final void close() {
            String cn = this.getClass().getName();
            String mn = "close";
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "close");
            }
            Pump pump = this;
            synchronized (pump) {
                if (!this.closing) {
                    try {
                        assert (this.executor != null);
                        assert (this.task != null);
                        assert (this.errorHandlingTask != null);
                        this.closing = true;
                        this.executor.shutdown();
                        this.task.cancel(true);
                        this.task = null;
                        this.errorHandlingTask.cancel(true);
                        this.errorHandlingTask = null;
                        try {
                            if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                                this.executor.shutdownNow();
                                if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS) && this.logger.isLoggable(Level.WARNING)) {
                                    this.logger.logp(Level.WARNING, cn, "close", "this.executor.awaitTermination() failed");
                                }
                            }
                        }
                        catch (InterruptedException interruptedException) {
                            this.executor.shutdownNow();
                            Thread.currentThread().interrupt();
                        }
                        this.executor = null;
                    }
                    finally {
                        this.closing = false;
                    }
                }
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(cn, "close");
            }
        }

        private final boolean shouldSynchronize(Instant now) {
            boolean returnValue;
            String cn = this.getClass().getName();
            String mn = "shouldSynchronize";
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "shouldSynchronize", now);
            }
            if (this.closing) {
                returnValue = false;
            } else {
                Duration interval = this.getSynchronizationInterval();
                if (interval == null || interval.isZero()) {
                    returnValue = false;
                } else if (now == null) {
                    returnValue = Instant.now().compareTo(this.nextSynchronizationInstant) >= 0;
                } else {
                    boolean bl = returnValue = now.compareTo(this.nextSynchronizationInstant) >= 0;
                }
            }
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.exiting(cn, "shouldSynchronize", returnValue);
            }
            return returnValue;
        }

        private final void determineNextSynchronizationInterval(Instant now) {
            Duration synchronizationInterval;
            String cn = this.getClass().getName();
            String mn = "determineNextSynchronizationInterval";
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "determineNextSynchronizationInterval", now);
            }
            this.nextSynchronizationInstant = (synchronizationInterval = this.getSynchronizationInterval()) == null ? (now == null ? Instant.now() : now) : (now == null ? Instant.now().plus(synchronizationInterval) : now.plus(synchronizationInterval));
            if (this.logger.isLoggable(Level.FINER)) {
                this.logger.entering(cn, "determineNextSynchronizationInterval");
            }
        }

        public final void setSynchronizationInterval(Duration synchronizationInterval) {
            this.synchronizationInterval = synchronizationInterval;
        }

        public final Duration getSynchronizationInterval() {
            return this.synchronizationInterval;
        }

        private static final class PumpThreadFactory
        implements ThreadFactory {
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);

            private PumpThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                this.group = s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
            }

            @Override
            public final Thread newThread(Runnable runnable) {
                Thread returnValue = new Thread(this.group, runnable, "event-pump-thread-" + this.threadNumber.getAndIncrement(), 0L);
                if (returnValue.isDaemon()) {
                    returnValue.setDaemon(false);
                }
                if (returnValue.getPriority() != 5) {
                    returnValue.setPriority(5);
                }
                return returnValue;
            }
        }
    }
}

