/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.clnt.v2_5.dsl.internal;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.v2_5.HasMetadata;
import io.fabric8.kubernetes.api.model.v2_5.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.v2_5.Status;
import io.fabric8.kubernetes.api.model.v2_5.WatchEvent;
import io.fabric8.kubernetes.clnt.v2_5.KubernetesClientException;
import io.fabric8.kubernetes.clnt.v2_5.Watch;
import io.fabric8.kubernetes.clnt.v2_5.Watcher;
import io.fabric8.kubernetes.clnt.v2_5.dsl.base.BaseOperation;
import io.fabric8.kubernetes.clnt.v2_5.dsl.base.OperationSupport;
import io.fabric8.kubernetes.clnt.v2_5.utils.Utils;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>>
implements Watch {
    private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final BaseOperation<T, L, ?, ?> baseOperation;
    private final Watcher<T> watcher;
    private final AtomicBoolean forceClosed = new AtomicBoolean();
    private final AtomicReference<String> resourceVersion;
    private final int reconnectLimit;
    private final int reconnectInterval;
    private final AtomicBoolean reconnectPending = new AtomicBoolean(false);
    private static final int maxIntervalExponent = 5;
    private final URL requestUrl;
    private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0);
    private OkHttpClient clonedClient;
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(WatchHTTPManager.this));
            ret.setDaemon(true);
            return ret;
        }
    });

    public WatchHTTPManager(OkHttpClient client, BaseOperation<T, L, ?, ?> baseOperation, String version, Watcher<T> watcher, int reconnectInterval, int reconnectLimit, long connectTimeout) throws MalformedURLException {
        if (version == null) {
            Object currentList = baseOperation.list();
            this.resourceVersion = new AtomicReference<String>(currentList.getMetadata().getResourceVersion());
        } else {
            this.resourceVersion = new AtomicReference<String>(version);
        }
        this.baseOperation = baseOperation;
        this.watcher = watcher;
        this.reconnectInterval = reconnectInterval;
        this.reconnectLimit = reconnectLimit;
        OkHttpClient clonedClient = client.newBuilder().connectTimeout(connectTimeout, TimeUnit.MILLISECONDS).readTimeout(0L, TimeUnit.MILLISECONDS).cache(null).build();
        for (Interceptor i : clonedClient.networkInterceptors()) {
            if (!(i instanceof HttpLoggingInterceptor)) continue;
            HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor)i;
            interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
        }
        this.clonedClient = clonedClient;
        this.requestUrl = baseOperation.getNamespacedUrl();
        this.runWatch();
    }

    private final void runWatch() {
        logger.debug("Watching via HTTP GET ... {}", (Object)this);
        HttpUrl.Builder httpUrlBuilder = HttpUrl.get((URL)this.requestUrl).newBuilder();
        String labelQueryParam = this.baseOperation.getLabelQueryParam();
        if (Utils.isNotNullOrEmpty(labelQueryParam)) {
            httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam);
        }
        String fieldQueryString = this.baseOperation.getFieldQueryParam();
        String name = this.baseOperation.getName();
        if (name != null && name.length() > 0) {
            if (fieldQueryString.length() > 0) {
                fieldQueryString = fieldQueryString + ",";
            }
            fieldQueryString = fieldQueryString + "metadata.name=" + name;
        }
        if (Utils.isNotNullOrEmpty(fieldQueryString)) {
            httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
        }
        httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get()).addQueryParameter("watch", "true");
        final Request request = new Request.Builder().get().url(httpUrlBuilder.build()).addHeader("Origin", this.requestUrl.getProtocol() + "://" + this.requestUrl.getHost() + ":" + this.requestUrl.getPort()).build();
        this.clonedClient.newCall(request).enqueue(new Callback(){

            public void onFailure(Call call, IOException e) {
                logger.info("Watch connection failed. reason: {}", (Object)e.getMessage());
                WatchHTTPManager.this.scheduleReconnect();
            }

            public void onResponse(Call call, Response response) throws IOException {
                if (!response.isSuccessful()) {
                    throw OperationSupport.requestFailure(request, OperationSupport.createStatus(response.code(), response.message()));
                }
                try {
                    BufferedSource source = response.body().source();
                    while (!source.exhausted()) {
                        String message = source.readUtf8LineStrict();
                        WatchHTTPManager.this.onMessage(message);
                    }
                }
                catch (Exception e) {
                    logger.info("Watch terminated unexpectedly. reason: {}", (Object)e.getMessage());
                }
                if (response != null) {
                    response.body().close();
                }
                WatchHTTPManager.this.scheduleReconnect();
            }
        });
    }

    private void scheduleReconnect() {
        if (this.forceClosed.get()) {
            logger.warn("Ignoring error for already closed/closing connection");
            return;
        }
        if (this.currentReconnectAttempt.get() >= this.reconnectLimit && this.reconnectLimit >= 0) {
            this.watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
            return;
        }
        logger.debug("Submitting reconnect task to the executor");
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                if (!WatchHTTPManager.this.reconnectPending.compareAndSet(false, true)) {
                    logger.debug("Reconnect already scheduled");
                    return;
                }
                try {
                    logger.debug("Scheduling reconnect task");
                    WatchHTTPManager.this.executor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                WatchHTTPManager.this.runWatch();
                                WatchHTTPManager.this.reconnectPending.set(false);
                            }
                            catch (Exception e) {
                                logger.error("Exception in reconnect", (Throwable)e);
                                WatchHTTPManager.this.close();
                                WatchHTTPManager.this.watcher.onClose(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
                            }
                        }
                    }, WatchHTTPManager.this.nextReconnectInterval(), TimeUnit.MILLISECONDS);
                }
                catch (RejectedExecutionException e) {
                    logger.error("Exception in reconnect", (Throwable)e);
                    WatchHTTPManager.this.reconnectPending.set(false);
                }
            }
        });
    }

    public void onMessage(String messageSource) throws IOException {
        try {
            WatchEvent event = (WatchEvent)mapper.readValue(messageSource, WatchEvent.class);
            if (event.getObject() instanceof HasMetadata) {
                String newResourceVersion;
                HasMetadata obj = (HasMetadata)event.getObject();
                String currentResourceVersion = this.resourceVersion.get();
                if (currentResourceVersion.compareTo(newResourceVersion = obj.getMetadata().getResourceVersion()) < 0) {
                    this.resourceVersion.compareAndSet(currentResourceVersion, newResourceVersion);
                }
                Watcher.Action action = Watcher.Action.valueOf(event.getType());
                this.watcher.eventReceived(action, obj);
            } else if (event.getObject() instanceof Status) {
                Status status = (Status)event.getObject();
                if (status.getCode() == 410) {
                    this.close();
                    this.watcher.onClose(new KubernetesClientException(status));
                    return;
                }
                logger.error("Error received: {}", (Object)status.toString());
            } else {
                logger.error("Unknown message received: {}", (Object)messageSource);
            }
        }
        catch (IOException e) {
            logger.error("Could not deserialize watch event: {}", (Object)messageSource, (Object)e);
        }
        catch (ClassCastException e) {
            logger.error("Received wrong type of object for watch", (Throwable)e);
        }
        catch (IllegalArgumentException e) {
            logger.error("Invalid event type", (Throwable)e);
        }
    }

    private long nextReconnectInterval() {
        int exponentOfTwo = this.currentReconnectAttempt.getAndIncrement();
        if (exponentOfTwo > 5) {
            exponentOfTwo = 5;
        }
        long ret = this.reconnectInterval * (1 << exponentOfTwo);
        logger.info("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")");
        return ret;
    }

    @Override
    public void close() {
        logger.debug("Force closing the watch {}", (Object)this);
        this.forceClosed.set(true);
        if (!this.executor.isShutdown()) {
            try {
                this.executor.shutdown();
                if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                    logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", (Object)this);
                    this.executor.shutdownNow();
                }
            }
            catch (Throwable t) {
                throw KubernetesClientException.launderThrowable(t);
            }
        }
    }
}

