/*
 * Decompiled with CFR 0.152.
 */
package org.microbean.kubernetes.controller;

import io.fabric8.kubernetes.api.model.HasMetadata;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.Immutable;
import net.jcip.annotations.ThreadSafe;
import org.microbean.kubernetes.controller.AbstractEvent;
import org.microbean.kubernetes.controller.Event;
import org.microbean.kubernetes.controller.ResourceTrackingEventQueueConsumer;
import org.microbean.kubernetes.controller.SynchronizationEvent;

@Immutable
@ThreadSafe
public final class EventDistributor<T extends HasMetadata>
extends ResourceTrackingEventQueueConsumer<T>
implements AutoCloseable {
    @GuardedBy(value="readLock && writeLock")
    private final Collection<Pump<T>> distributors;
    @GuardedBy(value="readLock && writeLock")
    private final Collection<Pump<T>> synchronizingDistributors;
    private final Duration synchronizationInterval;
    private final Lock readLock;
    private final Lock writeLock;

    public EventDistributor(Map<Object, T> knownObjects) {
        super(knownObjects);
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        this.readLock = lock.readLock();
        this.writeLock = lock.writeLock();
        this.distributors = new ArrayList<Pump<T>>();
        this.synchronizingDistributors = new ArrayList<Pump<T>>();
        this.synchronizationInterval = null;
    }

    public final void addConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Pump distributor = new Pump(this.synchronizationInterval, consumer);
                this.distributors.add(distributor);
                this.synchronizingDistributors.add(distributor);
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void removeConsumer(Consumer<? super AbstractEvent<? extends T>> consumer) {
        if (consumer != null) {
            this.writeLock.lock();
            try {
                Iterator<Pump<T>> iterator = this.distributors.iterator();
                assert (iterator != null);
                while (iterator.hasNext()) {
                    Pump<T> distributor = iterator.next();
                    if (distributor == null || !consumer.equals(((Pump)distributor).getEventConsumer())) continue;
                    iterator.remove();
                    break;
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    @Override
    public final void close() {
        this.writeLock.lock();
        try {
            this.distributors.parallelStream().forEach(distributor -> distributor.close());
            this.synchronizingDistributors.clear();
            this.distributors.clear();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public final boolean shouldSynchronize() {
        boolean returnValue = false;
        this.writeLock.lock();
        try {
            this.synchronizingDistributors.clear();
            Instant now = Instant.now();
            this.distributors.parallelStream().filter(distributor -> ((Pump)distributor).shouldSynchronize(now)).forEach(distributor -> {
                this.synchronizingDistributors.add((Pump<T>)distributor);
                ((Pump)distributor).determineNextSynchronizationInterval(now);
            });
            returnValue = !this.synchronizingDistributors.isEmpty();
        }
        finally {
            this.writeLock.unlock();
        }
        return returnValue;
    }

    @Override
    protected final void accept(AbstractEvent<? extends T> event) {
        if (event != null) {
            if (event instanceof SynchronizationEvent) {
                this.accept((SynchronizationEvent)event);
            } else if (event instanceof Event) {
                this.accept((Event)event);
            } else assert (false) : "Unexpected event type: " + event.getClass();
        }
    }

    @Override
    private final void accept(SynchronizationEvent<? extends T> event) {
        this.readLock.lock();
        try {
            if (!this.synchronizingDistributors.isEmpty()) {
                this.synchronizingDistributors.parallelStream().forEach(distributor -> distributor.accept(event));
            }
        }
        finally {
            this.readLock.unlock();
        }
    }

    @Override
    private final void accept(Event<? extends T> event) {
        this.readLock.lock();
        try {
            if (!this.distributors.isEmpty()) {
                this.distributors.parallelStream().forEach(distributor -> distributor.accept(event));
            }
        }
        finally {
            this.readLock.unlock();
        }
    }

    private static final class Pump<T extends HasMetadata>
    implements Consumer<AbstractEvent<? extends T>>,
    AutoCloseable {
        private volatile boolean closing;
        private volatile Instant nextSynchronizationInstant;
        private volatile Duration synchronizationInterval;
        final BlockingQueue<AbstractEvent<? extends T>> queue;
        private final ScheduledExecutorService executor;
        private final Future<?> task;
        private final Consumer<? super AbstractEvent<? extends T>> eventConsumer;

        private Pump(Duration synchronizationInterval, Consumer<? super AbstractEvent<? extends T>> eventConsumer) {
            Objects.requireNonNull(eventConsumer);
            this.eventConsumer = eventConsumer;
            this.executor = this.createScheduledThreadPoolExecutor();
            if (this.executor == null) {
                throw new IllegalStateException("createScheduledThreadPoolExecutor() == null");
            }
            this.queue = new LinkedBlockingQueue<AbstractEvent<? extends T>>();
            this.task = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        this.eventConsumer.accept(this.queue.take());
                    }
                }
                catch (InterruptedException interruptedException) {
                    Thread.currentThread().interrupt();
                }
            }, 0L, 1L, TimeUnit.SECONDS);
            this.setSynchronizationInterval(synchronizationInterval);
        }

        private final ScheduledExecutorService createScheduledThreadPoolExecutor() {
            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
            executor.setRemoveOnCancelPolicy(true);
            return executor;
        }

        private final Consumer<? super AbstractEvent<? extends T>> getEventConsumer() {
            return this.eventConsumer;
        }

        @Override
        public final void accept(AbstractEvent<? extends T> event) {
            if (this.closing) {
                throw new IllegalStateException();
            }
            if (event != null) {
                boolean added = this.queue.add(event);
                assert (added);
            }
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public final void close() {
            this.closing = true;
            this.executor.shutdown();
            this.task.cancel(true);
            try {
                if (this.executor.awaitTermination(60L, TimeUnit.SECONDS)) return;
                this.executor.shutdownNow();
                if (this.executor.awaitTermination(60L, TimeUnit.SECONDS)) return;
            }
            catch (InterruptedException interruptedException) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }

        private final boolean shouldSynchronize(Instant now) {
            Duration interval;
            if (now == null) {
                now = Instant.now();
            }
            boolean returnValue = (interval = this.getSynchronizationInterval()) != null && !interval.isZero() && now.compareTo(this.nextSynchronizationInstant) >= 0;
            return returnValue;
        }

        private final void determineNextSynchronizationInterval(Instant now) {
            if (now == null) {
                now = Instant.now();
            }
            this.nextSynchronizationInstant = now.plus(this.synchronizationInterval);
        }

        public final void setSynchronizationInterval(Duration synchronizationInterval) {
            this.synchronizationInterval = synchronizationInterval;
        }

        public final Duration getSynchronizationInterval() {
            return this.synchronizationInterval;
        }
    }
}

