/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.k8s.uploader;

import com.google.gson.reflect.TypeToken;
import com.squareup.okhttp.Call;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.PodWatchUtils;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.models.V1Event;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;

public class JobPackageTransferThread
extends Thread {
    private static final Logger LOG = Logger.getLogger(JobPackageTransferThread.class.getName());
    public static boolean watchBeforeUploading;
    public static final int MAX_WAIT_TIME_FOR_POD_START = 100;
    public static final long SLEEP_INTERVAL_BETWEEN_TRANSFER_ATTEMPTS = 200L;
    public static final long MAX_FILE_TRANSFER_TRY_COUNT = 100L;
    private String namespace;
    private String podName;
    private String[] copyCommand;
    private String jobPackageFile;
    private boolean transferred = false;
    private static boolean cancelFileTransfer;
    private Watch<V1Event> watcher = null;
    private Object waitObject = new Object();
    private static JobPackageTransferThread[] transferThreads;
    private static boolean submittingStatefulSets;

    public JobPackageTransferThread(String namespace, String podName, String jobPackageFile) {
        this.namespace = namespace;
        this.jobPackageFile = jobPackageFile;
        this.podName = podName;
        this.copyCommand = KubernetesUtils.createCopyCommand(jobPackageFile, namespace, podName);
    }

    public boolean packageTransferred() {
        return this.transferred;
    }

    public String getPodName() {
        return this.podName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (watchBeforeUploading) {
            boolean podReady = this.watchPodToStarting();
            if (cancelFileTransfer) {
                return;
            }
            if (!podReady) {
                LOG.severe("Timeout limit has been reached. Pod has not started: " + this.podName);
                return;
            }
        } else {
            Object podReady = this.waitObject;
            synchronized (podReady) {
                try {
                    this.waitObject.wait();
                }
                catch (InterruptedException e) {
                    LOG.warning("Thread wait interrupted.");
                }
            }
        }
        int tryCount = 0;
        while (!this.transferred && (long)tryCount < 100L && !cancelFileTransfer) {
            this.transferred = KubernetesController.runProcess(this.copyCommand);
            if (this.transferred) {
                LOG.info("Job Package: " + this.jobPackageFile + " transferred to the pod: " + this.podName);
                continue;
            }
            if (++tryCount == 10 || (long)tryCount == 99L) {
                LOG.warning("Job Package: " + this.jobPackageFile + " could not be transferred to the pod: " + this.podName + ". Sleeping and will try again ... " + tryCount + "\nExecuted command: " + this.copyCommandAsString());
            }
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                LOG.log(Level.WARNING, "Thread sleep interrupted.", e);
            }
        }
    }

    private String copyCommandAsString() {
        String copyStr = "";
        for (String cmd : this.copyCommand) {
            copyStr = copyStr + cmd + " ";
        }
        return copyStr;
    }

    private boolean watchPodToStarting() {
        if (PodWatchUtils.apiClient == null || PodWatchUtils.coreApi == null) {
            PodWatchUtils.createApiInstances();
        }
        String fieldSelector = "involvedObject.name=" + this.podName;
        String reason = "Started";
        try {
            this.watcher = Watch.createWatch((ApiClient)PodWatchUtils.apiClient, (Call)PodWatchUtils.coreApi.listNamespacedEventCall(this.namespace, null, null, fieldSelector, null, null, null, null, Integer.valueOf(100), Boolean.TRUE, null, null), (Type)new TypeToken<Watch.Response<V1Event>>(){}.getType());
        }
        catch (ApiException e) {
            LOG.log(Level.SEVERE, "Can not start event watcher for the namespace: " + this.namespace, e);
            return false;
        }
        boolean podStarted = false;
        int i = 0;
        for (Watch.Response item : this.watcher) {
            if (cancelFileTransfer) break;
            if (item.object != null && reason.equals(((V1Event)item.object).getReason())) {
                ++i;
            }
            if (item.object == null || !reason.equals(((V1Event)item.object).getReason()) || !submittingStatefulSets) continue;
            podStarted = true;
            LOG.fine("Received Started event for the pod: " + this.podName + ", Started Count: " + i);
            break;
        }
        try {
            this.watcher.close();
        }
        catch (IOException e) {
            LOG.log(Level.WARNING, "Exception when closing the watcher.", e);
        }
        return podStarted;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeupThread() {
        Object object = this.waitObject;
        synchronized (object) {
            this.waitObject.notify();
        }
    }

    public static void startTransferThreads(String namespace, JobAPI.Job job, String jobPackageFile, boolean watchBefore) {
        watchBeforeUploading = watchBefore;
        ArrayList<String> podNames = KubernetesUtils.generatePodNames(job);
        podNames.add(KubernetesUtils.createJobMasterPodName(job.getJobName()));
        transferThreads = new JobPackageTransferThread[podNames.size()];
        for (int i = 0; i < podNames.size(); ++i) {
            JobPackageTransferThread.transferThreads[i] = new JobPackageTransferThread(namespace, podNames.get(i), jobPackageFile);
            transferThreads[i].start();
        }
    }

    public static void startTransferThreadsForScaledUpPods(String namespace, ArrayList<String> podNames, String jobPackageFile) {
        watchBeforeUploading = false;
        cancelFileTransfer = false;
        transferThreads = new JobPackageTransferThread[podNames.size()];
        for (int i = 0; i < podNames.size(); ++i) {
            JobPackageTransferThread.transferThreads[i] = new JobPackageTransferThread(namespace, podNames.get(i), jobPackageFile);
            transferThreads[i].start();
        }
    }

    public static void setSubmittingStatefulSets() {
        submittingStatefulSets = true;
    }

    public static boolean completeFileTransfers() {
        if (!watchBeforeUploading) {
            for (int i = 0; i < transferThreads.length; ++i) {
                transferThreads[i].wakeupThread();
            }
        }
        boolean allTransferred = true;
        for (int i = 0; i < transferThreads.length; ++i) {
            try {
                transferThreads[i].join();
                if (transferThreads[i].packageTransferred()) continue;
                LOG.log(Level.SEVERE, "Job Package is not transferred to the pod: " + transferThreads[i].getPodName());
                allTransferred = false;
                break;
            }
            catch (InterruptedException e) {
                LOG.log(Level.WARNING, "Thread sleep interrupted.", e);
            }
        }
        if (!allTransferred) {
            JobPackageTransferThread.cancelTransfers();
        }
        return allTransferred;
    }

    public static void cancelTransfers() {
        cancelFileTransfer = true;
    }

    static {
        cancelFileTransfer = false;
        submittingStatefulSets = false;
    }
}

