/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.clnt.v3_1.dsl.internal;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.v3_1.HasMetadata;
import io.fabric8.kubernetes.api.model.v3_1.KubernetesResource;
import io.fabric8.kubernetes.api.model.v3_1.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.v3_1.Status;
import io.fabric8.kubernetes.api.model.v3_1.WatchEvent;
import io.fabric8.kubernetes.clnt.v3_1.KubernetesClientException;
import io.fabric8.kubernetes.clnt.v3_1.Watch;
import io.fabric8.kubernetes.clnt.v3_1.Watcher;
import io.fabric8.kubernetes.clnt.v3_1.dsl.base.BaseOperation;
import io.fabric8.kubernetes.clnt.v3_1.dsl.base.OperationSupport;
import io.fabric8.kubernetes.clnt.v3_1.dsl.internal.WatchHTTPManager;
import io.fabric8.kubernetes.clnt.v3_1.utils.Utils;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesResourceList<T>>
implements Watch {
    private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private final AtomicBoolean forceClosed = new AtomicBoolean();
    private final AtomicReference<String> resourceVersion;
    private final BaseOperation<T, L, ?, ?> baseOperation;
    private final Watcher<T> watcher;
    private final int reconnectLimit;
    private final int reconnectInterval;
    private static final int maxIntervalExponent = 5;
    private final long websocketTimeout;
    private final AtomicInteger currentReconnectAttempt = new AtomicInteger(0);
    private final AtomicReference<WebSocket> webSocketRef = new AtomicReference();
    private final ScheduledExecutorService executor;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean reconnectPending = new AtomicBoolean(false);
    private final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue(1);
    private final URL requestUrl;
    private WebSocket webSocket;
    private OkHttpClient clonedClient;

    public WatchConnectionManager(OkHttpClient client, BaseOperation<T, L, ?, ?> baseOperation, String version, Watcher<T> watcher, int reconnectInterval, int reconnectLimit, long websocketTimeout) throws MalformedURLException {
        this.resourceVersion = new AtomicReference<String>(version);
        this.baseOperation = baseOperation;
        this.watcher = watcher;
        this.reconnectInterval = reconnectInterval;
        this.reconnectLimit = reconnectLimit;
        this.websocketTimeout = websocketTimeout;
        this.clonedClient = client.newBuilder().readTimeout(this.websocketTimeout, TimeUnit.MILLISECONDS).build();
        this.requestUrl = baseOperation.getNamespacedUrl();
        this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(WatchConnectionManager.this));
                ret.setDaemon(true);
                return ret;
            }
        });
        this.runWatch();
    }

    private final void runWatch() {
        logger.debug("Connecting websocket ... {}", (Object)this);
        HttpUrl.Builder httpUrlBuilder = HttpUrl.get((URL)this.requestUrl).newBuilder();
        String labelQueryParam = this.baseOperation.getLabelQueryParam();
        if (Utils.isNotNullOrEmpty(labelQueryParam)) {
            httpUrlBuilder.addQueryParameter("labelSelector", labelQueryParam);
        }
        String fieldQueryString = this.baseOperation.getFieldQueryParam();
        String name = this.baseOperation.getName();
        if (name != null && name.length() > 0) {
            if (this.baseOperation.isApiGroup()) {
                httpUrlBuilder.addPathSegment(name);
            } else {
                if (fieldQueryString.length() > 0) {
                    fieldQueryString = fieldQueryString + ",";
                }
                fieldQueryString = fieldQueryString + "metadata.name=" + name;
            }
        }
        if (Utils.isNotNullOrEmpty(fieldQueryString)) {
            if (this.baseOperation.isApiGroup()) {
                logger.warn("Ignoring field selector " + fieldQueryString + " on watch URI " + this.requestUrl + " as fieldSelector is not yet supported on API Groups APIs");
            } else {
                httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
            }
        }
        if (this.resourceVersion.get() != null) {
            httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get());
        }
        httpUrlBuilder.addQueryParameter("watch", "true");
        Request request = new Request.Builder().get().url(httpUrlBuilder.build()).addHeader("Origin", this.requestUrl.getProtocol() + "://" + this.requestUrl.getHost() + ":" + this.requestUrl.getPort()).build();
        this.webSocket = this.clonedClient.newWebSocket(request, new WebSocketListener(){

            public void onOpen(WebSocket webSocket, Response response) {
                if (response != null && response.body() != null) {
                    response.body().close();
                }
                logger.debug("WebSocket successfully opened");
                WatchConnectionManager.this.webSocketRef.set(webSocket);
                WatchConnectionManager.this.currentReconnectAttempt.set(0);
                WatchConnectionManager.this.started.set(true);
                WatchConnectionManager.this.queue.clear();
                WatchConnectionManager.this.queue.add(true);
            }

            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                if (WatchConnectionManager.this.forceClosed.get()) {
                    logger.debug("Ignoring onFailure for already closed/closing websocket", t);
                    if (response != null && response.body() != null) {
                        response.body().close();
                    }
                    return;
                }
                if (response != null && response.code() == 200) {
                    WatchConnectionManager.this.queue.clear();
                    WatchConnectionManager.this.queue.offer(new KubernetesClientException("Received 200 on websocket", response.code(), null));
                    response.body().close();
                    return;
                }
                if (response != null) {
                    Status status = OperationSupport.createStatus(response);
                    if (response.body() != null) {
                        response.body().close();
                    }
                    logger.warn("Exec Failure: HTTP {}, Status: {} - {}", new Object[]{response.code(), status.getCode(), status.getMessage(), t});
                    if (!WatchConnectionManager.this.started.get()) {
                        WatchConnectionManager.this.queue.clear();
                        WatchConnectionManager.this.queue.offer(new KubernetesClientException(status));
                    }
                } else {
                    logger.warn("Exec Failure", t);
                    if (!WatchConnectionManager.this.started.get()) {
                        WatchConnectionManager.this.queue.clear();
                        WatchConnectionManager.this.queue.offer(new KubernetesClientException("Failed to start websocket", t));
                    }
                }
                if (WatchConnectionManager.this.currentReconnectAttempt.get() >= WatchConnectionManager.this.reconnectLimit && WatchConnectionManager.this.reconnectLimit >= 0) {
                    WatchConnectionManager.this.closeEvent(new KubernetesClientException("Connection failure", t));
                    return;
                }
                WatchConnectionManager.this.scheduleReconnect();
            }

            public void onMessage(WebSocket webSocket, ByteString bytes) {
                this.onMessage(webSocket, bytes.utf8());
            }

            public void onMessage(WebSocket webSocket, String message) {
                try {
                    WatchEvent event = WatchHTTPManager.readWatchEvent(message);
                    KubernetesResource object = event.getObject();
                    if (object instanceof HasMetadata) {
                        HasMetadata obj = (HasMetadata)object;
                        String currentResourceVersion = (String)WatchConnectionManager.this.resourceVersion.get();
                        String newResourceVersion = obj.getMetadata().getResourceVersion();
                        if (currentResourceVersion == null || currentResourceVersion.compareTo(newResourceVersion) < 0) {
                            WatchConnectionManager.this.resourceVersion.compareAndSet(currentResourceVersion, newResourceVersion);
                        }
                        Watcher.Action action = Watcher.Action.valueOf(event.getType());
                        WatchConnectionManager.this.watcher.eventReceived(action, obj);
                    } else if (object instanceof KubernetesResourceList) {
                        KubernetesResourceList list = (KubernetesResourceList)((Object)object);
                        String currentResourceVersion = (String)WatchConnectionManager.this.resourceVersion.get();
                        String newResourceVersion = list.getMetadata().getResourceVersion();
                        if (currentResourceVersion == null || currentResourceVersion.compareTo(newResourceVersion) < 0) {
                            WatchConnectionManager.this.resourceVersion.compareAndSet(currentResourceVersion, newResourceVersion);
                        }
                        Watcher.Action action = Watcher.Action.valueOf(event.getType());
                        List items = list.getItems();
                        if (items != null) {
                            for (HasMetadata item : items) {
                                WatchConnectionManager.this.watcher.eventReceived(action, item);
                            }
                        }
                    } else if (object instanceof Status) {
                        Status status = (Status)object;
                        if (status.getCode() == 410) {
                            WatchConnectionManager.this.webSocketRef.set(null);
                            WatchConnectionManager.this.closeEvent(new KubernetesClientException(status));
                            WatchConnectionManager.this.close();
                            return;
                        }
                        logger.error("Error received: {}", (Object)status.toString());
                    } else {
                        logger.error("Unknown message received: {}", (Object)message);
                    }
                }
                catch (IOException e) {
                    logger.error("Could not deserialize watch event: {}", (Object)message, (Object)e);
                }
                catch (ClassCastException e) {
                    logger.error("Received wrong type of object for watch", (Throwable)e);
                }
                catch (IllegalArgumentException e) {
                    logger.error("Invalid event type", (Throwable)e);
                }
            }

            public void onClosing(WebSocket webSocket, int code, String reason) {
                webSocket.close(code, reason);
            }

            public void onClosed(WebSocket webSocket, int code, String reason) {
                logger.debug("WebSocket close received. code: {}, reason: {}", (Object)code, (Object)reason);
                if (WatchConnectionManager.this.forceClosed.get()) {
                    logger.debug("Ignoring onClose for already closed/closing websocket");
                    return;
                }
                if (WatchConnectionManager.this.currentReconnectAttempt.get() >= WatchConnectionManager.this.reconnectLimit && WatchConnectionManager.this.reconnectLimit >= 0) {
                    WatchConnectionManager.this.closeEvent(new KubernetesClientException("Connection unexpectedly closed"));
                    return;
                }
                WatchConnectionManager.this.scheduleReconnect();
            }
        });
    }

    private void scheduleReconnect() {
        logger.debug("Submitting reconnect task to the executor");
        this.executor.submit(new NamedRunnable("scheduleReconnect"){

            @Override
            public void execute() {
                if (!WatchConnectionManager.this.reconnectPending.compareAndSet(false, true)) {
                    logger.debug("Reconnect already scheduled");
                    return;
                }
                WatchConnectionManager.this.webSocketRef.set(null);
                try {
                    logger.debug("Scheduling reconnect task");
                    WatchConnectionManager.this.executor.schedule(new NamedRunnable("reconnectAttempt"){

                        @Override
                        public void execute() {
                            try {
                                WatchConnectionManager.this.runWatch();
                                WatchConnectionManager.this.reconnectPending.set(false);
                            }
                            catch (Exception e) {
                                logger.error("Exception in reconnect", (Throwable)e);
                                WatchConnectionManager.this.webSocketRef.set(null);
                                WatchConnectionManager.this.closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
                                WatchConnectionManager.this.close();
                            }
                        }
                    }, WatchConnectionManager.this.nextReconnectInterval(), TimeUnit.MILLISECONDS);
                }
                catch (RejectedExecutionException e) {
                    WatchConnectionManager.this.reconnectPending.set(false);
                }
            }
        });
    }

    public void waitUntilReady() {
        Utils.waitUntilReady(this.queue, 10L, TimeUnit.SECONDS);
    }

    @Override
    public void close() {
        logger.debug("Force closing the watch {}", (Object)this);
        this.closeEvent(null);
        this.closeWebSocket(this.webSocketRef.getAndSet(null));
        if (!this.executor.isShutdown()) {
            try {
                this.executor.shutdown();
                if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                    logger.warn("Executor didn't terminate in time after shutdown in close(), killing it in: {}", (Object)this);
                    this.executor.shutdownNow();
                }
            }
            catch (Throwable t) {
                throw KubernetesClientException.launderThrowable(t);
            }
        }
    }

    private void closeEvent(KubernetesClientException cause) {
        if (this.forceClosed.getAndSet(true)) {
            logger.debug("Ignoring duplicate firing of onClose event");
            return;
        }
        this.watcher.onClose(cause);
    }

    private void closeWebSocket(WebSocket ws) {
        if (ws != null) {
            logger.debug("Closing websocket {}", (Object)ws);
            try {
                if (!ws.close(1000, null)) {
                    logger.warn("Failed to close websocket");
                }
            }
            catch (IllegalStateException e) {
                logger.error("Called close on already closed websocket: {} {}", e.getClass(), (Object)e.getMessage());
            }
        }
    }

    private long nextReconnectInterval() {
        int exponentOfTwo = this.currentReconnectAttempt.getAndIncrement();
        if (exponentOfTwo > 5) {
            exponentOfTwo = 5;
        }
        long ret = this.reconnectInterval * (1 << exponentOfTwo);
        logger.info("Current reconnect backoff is " + ret + " milliseconds (T" + exponentOfTwo + ")");
        return ret;
    }

    private static abstract class NamedRunnable
    implements Runnable {
        private final String name;

        public NamedRunnable(String name) {
            this.name = Objects.requireNonNull(name);
        }

        private void tryToSetName(String value) {
            try {
                Thread.currentThread().setName(value);
            }
            catch (SecurityException securityException) {
                // empty catch block
            }
        }

        @Override
        public final void run() {
            String oldName = Thread.currentThread().getName();
            this.tryToSetName(this.name + "|" + oldName);
            try {
                this.execute();
            }
            finally {
                this.tryToSetName(oldName);
            }
        }

        protected abstract void execute();
    }
}

