/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.clnt.v5_3.dsl.internal;

import io.fabric8.kubernetes.api.model.v5_3.HasMetadata;
import io.fabric8.kubernetes.api.model.v5_3.KubernetesResource;
import io.fabric8.kubernetes.api.model.v5_3.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.v5_3.ListOptions;
import io.fabric8.kubernetes.api.model.v5_3.Status;
import io.fabric8.kubernetes.api.model.v5_3.WatchEvent;
import io.fabric8.kubernetes.clnt.v5_3.KubernetesClientException;
import io.fabric8.kubernetes.clnt.v5_3.Watcher;
import io.fabric8.kubernetes.clnt.v5_3.WatcherException;
import io.fabric8.kubernetes.clnt.v5_3.dsl.base.BaseOperation;
import io.fabric8.kubernetes.clnt.v5_3.dsl.base.OperationSupport;
import io.fabric8.kubernetes.clnt.v5_3.dsl.internal.AbstractWatchManager;
import io.fabric8.kubernetes.clnt.v5_3.dsl.internal.BaseOperationRequestBuilder;
import io.fabric8.kubernetes.clnt.v5_3.utils.Serialization;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>>
extends AbstractWatchManager<T> {
    private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);

    public WatchHTTPManager(OkHttpClient client, BaseOperation<T, L, ?> baseOperation, ListOptions listOptions, Watcher<T> watcher, int reconnectInterval, int reconnectLimit, long connectTimeout) throws MalformedURLException {
        this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, connectTimeout, 5);
    }

    public WatchHTTPManager(OkHttpClient client, BaseOperation<T, L, ?> baseOperation, ListOptions listOptions, Watcher<T> watcher, int reconnectInterval, int reconnectLimit, final long connectTimeout, int maxIntervalExponent) throws MalformedURLException {
        super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<T, L>(baseOperation, listOptions));
        this.initRunner(new HTTPClientRunner<T>(client, this){

            @Override
            OkHttpClient cloneAndCustomize(OkHttpClient client) {
                OkHttpClient clonedClient = client.newBuilder().connectTimeout(connectTimeout, TimeUnit.MILLISECONDS).readTimeout(0L, TimeUnit.MILLISECONDS).cache(null).build();
                for (Interceptor i : clonedClient.networkInterceptors()) {
                    if (!(i instanceof HttpLoggingInterceptor)) continue;
                    HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor)i;
                    interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
                }
                return clonedClient;
            }
        });
        this.runWatch();
    }

    protected static WatchEvent readWatchEvent(String messageSource) {
        WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class);
        KubernetesResource object = null;
        if (event != null) {
            object = event.getObject();
        }
        if (object == null) {
            object = Serialization.unmarshal(messageSource, KubernetesResource.class);
            if (event == null) {
                event = new WatchEvent(object, "MODIFIED");
            } else {
                event.setObject(object);
            }
        }
        if (event.getType() == null) {
            event.setType("MODIFIED");
        }
        return event;
    }

    private static abstract class HTTPClientRunner<T extends HasMetadata>
    extends AbstractWatchManager.ClientRunner {
        private final AbstractWatchManager<T> manager;
        private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

        public HTTPClientRunner(OkHttpClient client, AbstractWatchManager<T> manager) {
            super(client);
            this.manager = manager;
        }

        @Override
        void run(Request request) {
            this.client().newCall(request).enqueue(new Callback(){

                public void onFailure(Call call, IOException e) {
                    logger.info("Watch connection failed. reason: {}", (Object)e.getMessage());
                    this.scheduleReconnect(true);
                }

                public void onResponse(Call call, Response response) throws IOException {
                    if (!response.isSuccessful()) {
                        this.onStatus(OperationSupport.createStatus(response.code(), response.message()));
                    }
                    boolean shouldBackoff = true;
                    try {
                        BufferedSource source = response.body().source();
                        while (!source.exhausted()) {
                            String message = source.readUtf8LineStrict();
                            this.onMessage(message);
                        }
                        shouldBackoff = false;
                    }
                    catch (Exception e) {
                        logger.info("Watch terminated unexpectedly. reason: {}", (Object)e.getMessage());
                    }
                    if (response != null) {
                        response.body().close();
                    }
                    this.scheduleReconnect(shouldBackoff);
                }
            });
        }

        private void scheduleReconnect(boolean shouldBackoff) {
            if (this.manager.isForceClosed()) {
                logger.warn("Ignoring error for already closed/closing connection");
                return;
            }
            if (this.manager.cannotReconnect()) {
                this.manager.onClose(new WatcherException("Connection unexpectedly closed"));
                return;
            }
            logger.debug("Submitting reconnect task to the executor");
            this.manager.submit(() -> {
                if (!this.reconnectPending.compareAndSet(false, true)) {
                    logger.debug("Reconnect already scheduled");
                    return;
                }
                try {
                    logger.debug("Scheduling reconnect task");
                    long delay = shouldBackoff ? this.manager.nextReconnectInterval() : 0L;
                    this.manager.schedule(() -> {
                        try {
                            this.manager.runWatch();
                            this.reconnectPending.set(false);
                        }
                        catch (Exception e) {
                            logger.error("Exception in reconnect", (Throwable)e);
                            this.close();
                            this.manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
                        }
                    }, delay, TimeUnit.MILLISECONDS);
                }
                catch (RejectedExecutionException e) {
                    if (!this.manager.isForceClosed()) {
                        logger.error("Exception in reconnect", (Throwable)e);
                    }
                    this.reconnectPending.set(false);
                }
            });
        }

        public void onMessage(String messageSource) {
            try {
                WatchEvent event = WatchHTTPManager.readWatchEvent(messageSource);
                KubernetesResource object = event.getObject();
                if (object instanceof HasMetadata) {
                    HasMetadata obj = (HasMetadata)object;
                    this.manager.updateResourceVersion(obj.getMetadata().getResourceVersion());
                    Watcher.Action action = Watcher.Action.valueOf(event.getType());
                    this.manager.eventReceived(action, obj);
                } else if (object instanceof KubernetesResourceList) {
                    KubernetesResourceList list = (KubernetesResourceList)((Object)object);
                    this.manager.updateResourceVersion(list.getMetadata().getResourceVersion());
                    Watcher.Action action = Watcher.Action.valueOf(event.getType());
                    List items = list.getItems();
                    if (items != null) {
                        for (HasMetadata item : items) {
                            this.manager.eventReceived(action, item);
                        }
                    }
                } else if (object instanceof Status) {
                    this.onStatus((Status)object);
                } else {
                    logger.error("Unknown message received: {}", (Object)messageSource);
                }
            }
            catch (ClassCastException e) {
                logger.error("Received wrong type of object for watch", (Throwable)e);
            }
            catch (IllegalArgumentException e) {
                logger.error("Invalid event type", (Throwable)e);
            }
        }

        private void onStatus(Status status) {
            if (status.getCode() == 410) {
                this.close();
                this.manager.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
                return;
            }
            this.manager.eventReceived(Watcher.Action.ERROR, null);
            logger.error("Error received: {}", (Object)status.toString());
        }
    }
}

