/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.uploaders.k8s;

import com.google.gson.reflect.TypeToken;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.SchedulerContext;
import edu.iu.dsc.tws.api.scheduler.IUploader;
import edu.iu.dsc.tws.api.scheduler.UploaderException;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesContext;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.client.JobEndListener;
import edu.iu.dsc.tws.rsched.schedulers.k8s.client.JobEndWatcher;
import edu.iu.dsc.tws.rsched.uploaders.k8s.UploaderToPod;
import edu.iu.dsc.tws.rsched.utils.FileUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Watch;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
import okhttp3.Call;

public class DirectUploader
extends Thread
implements IUploader,
JobEndListener {
    private static final Logger LOG = Logger.getLogger(DirectUploader.class.getName());
    private CoreV1Api coreApi;
    private ApiClient apiClient;
    private Config config;
    private String namespace;
    private String jobID;
    private String tempJobDir;
    private String localJobPackageFile;
    private ArrayList<UploaderToPod> uploaders = new ArrayList();
    private Watch<V1Pod> watcher;
    private boolean stopUploader = false;
    private JobEndWatcher jobEndWatcher;

    public void initialize(Config cnfg, String jbID) {
        this.config = cnfg;
        this.namespace = KubernetesContext.namespace(this.config);
        this.jobID = jbID;
    }

    public URI uploadPackage(String sourceLocation) throws UploaderException {
        this.tempJobDir = sourceLocation;
        this.localJobPackageFile = sourceLocation + File.separator + SchedulerContext.jobPackageFileName((Config)this.config);
        KubernetesController.init(this.namespace);
        this.apiClient = KubernetesController.getApiClient();
        this.coreApi = KubernetesController.createCoreV1Api();
        this.start();
        this.jobEndWatcher = JobEndWatcher.init(this.namespace, this.jobID);
        this.jobEndWatcher.addJobEndListener(this::jobEnded);
        return null;
    }

    private void printLog() {
        String logMsg = System.lineSeparator() + System.lineSeparator();
        logMsg = logMsg + "Twister2 Client will upload the job package directly to the job pods.\n";
        logMsg = logMsg + "Twister2 Client needs to run until the job completes. \n";
        logMsg = logMsg + "###########   Please do not kill the Twister2 Client   ###########\n";
        logMsg = logMsg + System.lineSeparator();
        LOG.info(logMsg);
    }

    @Override
    public void run() {
        this.printLog();
        String jobPodsLabel = KubernetesUtils.jobLabelSelector(this.jobID);
        String targetFile = "/twister2-memory-dir/" + JobUtils.createJobPackageFileName(this.jobID);
        Integer timeoutSeconds = Integer.MAX_VALUE;
        try {
            this.watcher = Watch.createWatch((ApiClient)this.apiClient, (Call)this.coreApi.listNamespacedPodCall(this.namespace, null, null, null, null, jobPodsLabel, null, null, timeoutSeconds, Boolean.TRUE, null), (Type)new TypeToken<Watch.Response<V1Pod>>(){}.getType());
        }
        catch (ApiException e) {
            String logMessage = "Exception when watching the pods to get the IPs: \nexCode: " + e.getCode() + "\nresponseBody: " + e.getResponseBody();
            LOG.log(Level.SEVERE, logMessage, e);
            throw new RuntimeException(e);
        }
        try {
            for (Watch.Response item : this.watcher) {
                if (!this.stopUploader) {
                    if (item.object == null || !((V1Pod)item.object).getMetadata().getName().startsWith(this.jobID) || !KubernetesUtils.isPodRunning((V1Pod)item.object)) continue;
                    String podName = ((V1Pod)item.object).getMetadata().getName();
                    UploaderToPod uploader = new UploaderToPod(this.namespace, podName, this.localJobPackageFile, targetFile);
                    uploader.start();
                    this.uploaders.add(uploader);
                    continue;
                }
                break;
            }
        }
        catch (RuntimeException e) {
            if (this.stopUploader) {
                LOG.fine("Uploader is stopped.");
                return;
            }
            throw e;
        }
        this.closeWatcher();
    }

    public boolean complete() {
        return true;
    }

    private void closeWatcher() {
        if (this.watcher == null) {
            return;
        }
        try {
            this.watcher.close();
        }
        catch (IOException e) {
            LOG.log(Level.SEVERE, "Exception closing watcher.", e);
        }
        this.watcher = null;
    }

    public void stopUploader() {
        this.stopUploader = true;
        this.closeWatcher();
        for (UploaderToPod uploader : this.uploaders) {
            uploader.cancelTransfer();
        }
    }

    @Override
    public void jobEnded() {
        this.stopUploader();
        if (FileUtils.deleteDir(this.tempJobDir)) {
            LOG.log(Level.INFO, "CLEANED TEMPORARY DIRECTORY......:" + this.tempJobDir);
        }
    }

    public boolean undo() {
        this.stopUploader();
        if (this.jobEndWatcher != null) {
            this.jobEndWatcher.stopWatcher();
        }
        return true;
    }

    public void close() {
    }
}

