/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.worker;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.JobFaultyException;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.api.faulttolerance.JobProgress;
import edu.iu.dsc.tws.api.resource.IJobMasterFailureListener;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.IWorkerFailureListener;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.core.WorkerRuntime;
import edu.iu.dsc.tws.rsched.worker.JobProgressImpl;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;

public class WorkerManager
implements IWorkerFailureListener,
IJobMasterFailureListener {
    private static final Logger LOG = Logger.getLogger(WorkerManager.class.getName());
    private IWorker managedWorker;
    private Config config;
    private int workerID;
    private JobAPI.Job job;
    private IWorkerController workerController;
    private IPersistentVolume persistentVolume;
    private IVolatileVolume volatileVolume;
    private boolean firstInitBarrierProceeded = false;
    private final int maxRetries;
    private Map<Integer, JobMasterAPI.WorkerInfo> restartedWorkers = new TreeMap<Integer, JobMasterAPI.WorkerInfo>();

    public WorkerManager(Config config, JobAPI.Job job, IWorkerController workerController, IPersistentVolume persistentVolume, IVolatileVolume volatileVolume, IWorker worker) {
        this.config = config;
        this.job = job;
        this.workerID = workerController.getWorkerInfo().getWorkerID();
        this.workerController = workerController;
        this.persistentVolume = persistentVolume;
        this.volatileVolume = volatileVolume;
        this.managedWorker = worker;
        this.maxRetries = FaultToleranceContext.maxReExecutes((Config)config);
        WorkerRuntime.addWorkerFailureListener(this);
        WorkerRuntime.addJMFailureListener(this);
        JobProgressImpl.init();
    }

    public boolean execute() {
        while (JobProgress.getWorkerExecuteCount() < this.maxRetries) {
            LOG.info("Waiting on the init barrier before starting IWorker: " + this.workerID + " with restartCount: " + this.workerController.workerRestartCount() + " and with re-executionCount: " + JobProgress.getWorkerExecuteCount());
            try {
                this.workerController.waitOnInitBarrier();
                this.firstInitBarrierProceeded = true;
            }
            catch (TimeoutException e) {
                throw new Twister2RuntimeException("Could not pass through the init barrier", (Throwable)e);
            }
            LOG.fine("Proceeded through INIT barrier. Starting Worker: " + this.workerID);
            JobProgressImpl.setJobStatus(JobProgress.JobStatus.EXECUTING);
            JobProgressImpl.increaseWorkerExecuteCount();
            JobProgressImpl.setRestartedWorkers(this.restartedWorkers.values());
            try {
                this.managedWorker.execute(this.config, this.job, this.workerController, this.persistentVolume, this.volatileVolume);
            }
            catch (JobFaultyException cue) {
                JobProgressImpl.setJobStatus(JobProgress.JobStatus.FAULTY);
                LOG.warning("thrown JobFaultyException. Some workers should have failed.");
            }
            if (!JobProgress.isJobHealthy()) continue;
            try {
                LOG.info("Worker completed, waiting for other workers to finish at the final barrier.");
                this.workerController.waitOnBarrier(Long.MAX_VALUE);
                LOG.info("Worker finished successfully");
                return true;
            }
            catch (TimeoutException e) {
                throw new Twister2RuntimeException("Could not pass through the final barrier", (Throwable)e);
            }
            catch (JobFaultyException e) {
                JobProgressImpl.setJobStatus(JobProgress.JobStatus.FAULTY);
                LOG.warning("thrown JobFaultyException. Some workers failed before finishing.");
            }
        }
        LOG.info(String.format("Re-executed IWorker %d times and failed, we are exiting", this.maxRetries));
        return false;
    }

    public void failed(int wID) {
        if (!this.firstInitBarrierProceeded) {
            LOG.fine("Worker failure event received before first INIT barrier. Failed worker: " + wID);
            return;
        }
        if (JobProgress.isJobHealthy()) {
            this.faultOccurred(wID);
        }
    }

    public void restarted(JobMasterAPI.WorkerInfo workerInfo) {
        if (!this.firstInitBarrierProceeded) {
            LOG.fine("Worker restarted event received before first INIT barrier. Restarted worker: " + workerInfo.getWorkerID());
            return;
        }
        if (JobProgress.isJobHealthy()) {
            this.faultOccurred(workerInfo.getWorkerID());
        }
        this.restartedWorkers.put(workerInfo.getWorkerID(), workerInfo);
    }

    private void faultOccurred(int wID) {
        JobProgressImpl.setJobStatus(JobProgress.JobStatus.FAULTY);
        LOG.warning("A fault occurred. Job moves into the FAULTY stage.");
        this.restartedWorkers.clear();
        JobProgressImpl.faultOccurred(wID);
    }

    public void jmFailed() {
    }

    public void jmRestarted(String jobMasterAddress) {
        if (!this.firstInitBarrierProceeded) {
            LOG.warning("Job Master restarted event received before the first INIT barrier. Ignoring");
            return;
        }
        if (JobProgress.isJobHealthy()) {
            this.faultOccurred(-1);
        }
    }
}

