/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.clnt.v6_8.informers.impl.cache;

import io.fabric8.kubernetes.api.model.v6_8.HasMetadata;
import io.fabric8.kubernetes.api.model.v6_8.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.v6_8.ListOptionsBuilder;
import io.fabric8.kubernetes.clnt.v6_8.KubernetesClientException;
import io.fabric8.kubernetes.clnt.v6_8.Watch;
import io.fabric8.kubernetes.clnt.v6_8.Watcher;
import io.fabric8.kubernetes.clnt.v6_8.WatcherException;
import io.fabric8.kubernetes.clnt.v6_8.dsl.internal.AbstractWatchManager;
import io.fabric8.kubernetes.clnt.v6_8.informers.ExceptionHandler;
import io.fabric8.kubernetes.clnt.v6_8.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.clnt.v6_8.informers.impl.cache.SyncableStore;
import io.fabric8.kubernetes.clnt.v6_8.utils.ExponentialBackoffIntervalCalculator;
import io.fabric8.kubernetes.clnt.v6_8.utils.Utils;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {
    private static final Logger log = LoggerFactory.getLogger(Reflector.class);
    private static long MIN_TIMEOUT = TimeUnit.MINUTES.toSeconds(5L);
    private volatile String lastSyncResourceVersion;
    private final ListerWatcher<T, L> listerWatcher;
    private final SyncableStore<T> store;
    private final ReflectorWatcher watcher;
    private volatile boolean watching;
    private volatile CompletableFuture<AbstractWatchManager<T>> watchFuture;
    private volatile CompletableFuture<?> reconnectFuture;
    private final CompletableFuture<Void> startFuture = new CompletableFuture();
    private final CompletableFuture<Void> stopFuture = new CompletableFuture();
    private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
    private final Executor executor;
    private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException);
    private long minTimeout = MIN_TIMEOUT;
    private CompletableFuture<Void> timeoutFuture;
    private boolean cachedListing = true;

    public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
        this(listerWatcher, store, Runnable::run);
    }

    public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store, Executor executor) {
        this.listerWatcher = listerWatcher;
        this.store = store;
        this.watcher = new ReflectorWatcher();
        this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), -1);
        this.executor = executor;
    }

    public CompletableFuture<Void> start() {
        this.listSyncAndWatch();
        return this.startFuture;
    }

    public CompletableFuture<Void> getStartFuture() {
        return this.startFuture;
    }

    public void stop() {
        this.startFuture.completeExceptionally(new KubernetesClientException("informer manually stopped before starting"));
        CompletableFuture<?> future = this.reconnectFuture;
        if (future != null) {
            future.cancel(true);
        }
        this.stopWatcher();
        this.stopFuture.complete(null);
    }

    private synchronized void stopWatcher() {
        Optional.ofNullable(this.watchFuture).ifPresent(theFuture -> {
            this.watchFuture = null;
            theFuture.whenComplete((w, t) -> {
                if (w != null) {
                    this.stopWatch((Watch)w);
                }
            });
        });
        if (this.timeoutFuture != null) {
            this.timeoutFuture.cancel(true);
        }
    }

    public CompletableFuture<Void> listSyncAndWatch() {
        if (this.isStopped()) {
            return CompletableFuture.completedFuture(null);
        }
        ConcurrentSkipListSet<String> nextKeys = new ConcurrentSkipListSet<String>();
        CompletionStage theFuture = ((CompletableFuture)this.processList(nextKeys, null).thenCompose(result -> {
            String latestResourceVersion;
            this.store.retainAll(nextKeys);
            this.lastSyncResourceVersion = latestResourceVersion = result.getMetadata().getResourceVersion();
            log.debug("Listing items ({}) for {} at v{}", new Object[]{nextKeys.size(), this, latestResourceVersion});
            return this.startWatcher(latestResourceVersion);
        })).thenAccept(w -> {
            if (w != null) {
                if (!this.isStopped()) {
                    if (log.isDebugEnabled()) {
                        log.debug("Watch started for {}", (Object)this);
                    }
                    this.watching = true;
                } else {
                    this.stopWatch((Watch)w);
                }
            }
        });
        ((CompletableFuture)theFuture).whenComplete((v, t) -> {
            if (t != null) {
                this.onException("listSyncAndWatch", (Throwable)t);
            } else {
                this.startFuture.complete(null);
                this.retryIntervalCalculator.resetReconnectAttempts();
            }
        });
        return theFuture;
    }

    private void onException(String operation, Throwable t) {
        if (this.handler.retryAfterException(this.startFuture.isDone() && !this.startFuture.isCompletedExceptionally(), t)) {
            log.warn("{} failed for {}, will retry", new Object[]{operation, this, t});
            this.reconnect();
        } else {
            log.error("{} failed for {}, will stop", new Object[]{operation, this, t});
            this.startFuture.completeExceptionally(t);
            this.stopFuture.completeExceptionally(t);
        }
    }

    protected void reconnect() {
        if (this.isStopped()) {
            return;
        }
        this.reconnectFuture = Utils.schedule((Executor)this.executor, this::listSyncAndWatch, (long)this.retryIntervalCalculator.nextReconnectInterval(), (TimeUnit)TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<L> processList(Set<String> nextKeys, String continueVal) {
        CompletableFuture<L> futureResult = this.listerWatcher.submitList(((ListOptionsBuilder)((ListOptionsBuilder)((ListOptionsBuilder)new ListOptionsBuilder().withResourceVersion(this.isCachedListing(continueVal) ? "0" : null)).withLimit(this.listerWatcher.getLimit())).withContinue(continueVal)).build());
        return futureResult.thenCompose(result -> {
            result.getItems().forEach(i -> {
                String key = this.store.getKey(i);
                nextKeys.add(key);
            });
            this.store.update(result.getItems());
            String nextContinueVal = result.getMetadata().getContinue();
            if (Utils.isNotNullOrEmpty((String)nextContinueVal)) {
                return this.processList(nextKeys, nextContinueVal);
            }
            return CompletableFuture.completedFuture(result);
        });
    }

    private boolean isCachedListing(String continueVal) {
        return this.cachedListing && this.listerWatcher.getLimit() == null && this.lastSyncResourceVersion == null && continueVal == null;
    }

    private void stopWatch(Watch w) {
        log.debug("Stopping watcher for {} at v{}", (Object)this, (Object)this.lastSyncResourceVersion);
        w.close();
        this.watchStopped();
    }

    private synchronized CompletableFuture<? extends Watch> startWatcher(String latestResourceVersion) {
        if (this.isStopped()) {
            return CompletableFuture.completedFuture(null);
        }
        log.debug("Starting watcher for {} at v{}", (Object)this, (Object)latestResourceVersion);
        CompletableFuture future = this.listerWatcher.submitWatch(((ListOptionsBuilder)((ListOptionsBuilder)new ListOptionsBuilder().withResourceVersion(latestResourceVersion)).withTimeoutSeconds(this.minTimeout * 2L)).build(), this.watcher);
        LongSupplier timeout = () -> (long)((Math.random() + 1.0) * (double)this.minTimeout);
        if (this.timeoutFuture != null) {
            this.timeoutFuture.cancel(true);
        }
        this.timeoutFuture = new CompletableFuture();
        Utils.scheduleWithVariableRate(this.timeoutFuture, (Executor)this.executor, () -> future.thenAccept(AbstractWatchManager::closeRequest), (long)timeout.getAsLong(), (LongSupplier)timeout, (TimeUnit)TimeUnit.SECONDS);
        this.watchFuture = future;
        return this.watchFuture;
    }

    public void setMinTimeout(long minTimeout) {
        this.minTimeout = minTimeout;
    }

    private synchronized void watchStopped() {
        this.watching = false;
    }

    public String getLastSyncResourceVersion() {
        return this.lastSyncResourceVersion;
    }

    public boolean isStopped() {
        return this.stopFuture.isDone();
    }

    public boolean isWatching() {
        return this.watching;
    }

    ReflectorWatcher getWatcher() {
        return this.watcher;
    }

    public String toString() {
        return this.listerWatcher.getApiEndpointPath();
    }

    public CompletableFuture<Void> getStopFuture() {
        return this.stopFuture;
    }

    public void setExceptionHandler(ExceptionHandler handler) {
        this.handler = handler;
    }

    public void usingInitialState() {
        this.cachedListing = false;
    }

    class ReflectorWatcher
    implements Watcher<T> {
        ReflectorWatcher() {
        }

        public void eventReceived(Watcher.Action action, T resource) {
            if (action == null) {
                throw new KubernetesClientException("Unrecognized event for " + Reflector.this);
            }
            if (resource == null) {
                throw new KubernetesClientException("Unrecognized resource for " + Reflector.this);
            }
            if (log.isDebugEnabled()) {
                log.debug("Event received {} {} resourceVersion v{} for {}", new Object[]{action.name(), resource.getKind(), resource.getMetadata().getResourceVersion(), Reflector.this});
            }
            switch (action) {
                case ERROR: {
                    throw new KubernetesClientException("ERROR event");
                }
                case ADDED: {
                    Reflector.this.store.add(resource);
                    break;
                }
                case MODIFIED: {
                    Reflector.this.store.update(resource);
                    break;
                }
                case DELETED: {
                    Reflector.this.store.delete(resource);
                }
            }
            Reflector.this.lastSyncResourceVersion = resource.getMetadata().getResourceVersion();
        }

        public void onClose(WatcherException exception) {
            Reflector.this.watchStopped();
            if (exception.isHttpGone()) {
                if (log.isDebugEnabled()) {
                    log.debug("Watch restarting due to http gone for {}", (Object)Reflector.this);
                }
                Reflector.this.reconnect();
            } else {
                Reflector.this.onException("watch", (Throwable)exception);
            }
        }

        public void onClose() {
            Reflector.this.watchStopped();
            log.debug("Watch gracefully closed for {}", (Object)Reflector.this);
        }

        public boolean reconnecting() {
            return true;
        }
    }
}

