/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.server;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.driver.IDriver;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.common.net.tcp.request.RRServer;
import edu.iu.dsc.tws.common.zk.WorkerWithState;
import edu.iu.dsc.tws.master.dashclient.DashboardClient;
import edu.iu.dsc.tws.master.dashclient.models.JobState;
import edu.iu.dsc.tws.master.server.JobMaster;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class WorkerMonitor
implements MessageHandler {
    private static final Logger LOG = Logger.getLogger(WorkerMonitor.class.getName());
    private JobMaster jobMaster;
    private RRServer rrServer;
    private DashboardClient dashClient;
    private IDriver driver;
    private boolean faultTolerant;
    private JobState jobState;
    private boolean allJoined = false;
    private boolean publishedAllJoinedToDriver = false;
    private int numberOfWorkers;
    private ConcurrentSkipListMap<Integer, WorkerWithState> workers;

    public WorkerMonitor(JobMaster jobMaster, RRServer rrServer, DashboardClient dashClient, JobAPI.Job job, IDriver driver, boolean faultTolerant) {
        this.jobMaster = jobMaster;
        this.rrServer = rrServer;
        this.dashClient = dashClient;
        this.driver = driver;
        this.numberOfWorkers = job.getNumberOfWorkers();
        this.faultTolerant = faultTolerant;
        this.jobState = JobState.STARTING;
        this.workers = new ConcurrentSkipListMap();
    }

    public int getNumberOfWorkers() {
        return this.numberOfWorkers;
    }

    public int getWorkersListSize() {
        return this.workers.size();
    }

    public Collection<WorkerWithState> getWorkerList() {
        return this.workers.values();
    }

    public JobMasterAPI.WorkerInfo getWorkerInfoForID(int id) {
        WorkerWithState workerWithState = this.workers.get(id);
        if (workerWithState == null) {
            return null;
        }
        return workerWithState.getInfo();
    }

    public List<Integer> getWorkerIDs() {
        return this.workers.values().stream().map(wws -> wws.getWorkerID()).sorted().collect(Collectors.toList());
    }

    public List<JobMasterAPI.WorkerInfo> getWorkerInfoList() {
        return this.workers.values().stream().map(wws -> wws.getInfo()).collect(Collectors.toList());
    }

    public boolean existWorker(int workerID) {
        return this.workers.containsKey(workerID);
    }

    public boolean isAllJoined() {
        return this.allJoined;
    }

    public void onMessage(RequestID id, int workerId, Message message) {
        if (message instanceof JobMasterAPI.WorkerMessage) {
            LOG.log(Level.FINE, "WorkerMessage received: " + message.toString());
            JobMasterAPI.WorkerMessage workerMessage = (JobMasterAPI.WorkerMessage)message;
            this.workerMessageReceived(id, workerMessage);
        } else {
            LOG.log(Level.SEVERE, "Un-known message received: " + message);
        }
    }

    public String started(WorkerWithState workerWithState) {
        if (workerWithState.getWorkerID() >= this.numberOfWorkers) {
            String warnMessage = String.format("A worker joined but its workerID[%s] is higher than numberOfWorkers[%s]. If this is not because of scaling up, it seems problematic. Joined worker: %s", workerWithState.getWorkerID(), this.numberOfWorkers, workerWithState.getInfo());
            LOG.warning(warnMessage);
        }
        if (this.existWorker(workerWithState.getWorkerID())) {
            String failMessage = "There is an already registered worker with workerID: " + workerWithState.getWorkerID();
            LOG.severe(failMessage);
            return failMessage;
        }
        this.workers.put(workerWithState.getWorkerID(), workerWithState);
        LOG.info("Worker: " + workerWithState.getWorkerID() + " joined the job.");
        if (this.dashClient != null) {
            this.dashClient.registerWorker(workerWithState.getInfo(), workerWithState.getState());
        }
        this.handleAllJoined();
        return null;
    }

    public String restarted(WorkerWithState workerWithState) {
        if (workerWithState.getWorkerID() >= this.numberOfWorkers) {
            String warnMessage = String.format("A worker joined but its workerID[%s] is higher than numberOfWorkers[%s]. If this is not because of scaling up, it seems problematic. Joined worker: %s", workerWithState.getWorkerID(), this.numberOfWorkers, workerWithState.getInfo());
            LOG.warning(warnMessage);
        }
        if (this.dashClient != null) {
            this.dashClient.registerWorker(workerWithState.getInfo(), workerWithState.getState());
        }
        if (!this.faultTolerant) {
            this.jobState = JobState.FAILED;
            String failMessage = String.format("worker[%s] is coming from failure in NON-FAULT TOLERANT job. Terminating the job.", workerWithState.getWorkerID());
            LOG.info(failMessage);
            this.jobMaster.completeJob(JobState.FAILED);
            return failMessage;
        }
        if (!this.existWorker(workerWithState.getWorkerID())) {
            LOG.warning(String.format("The worker[%s] that has not joined the job yet, tries to rejoin. Ignoring this event.", workerWithState.getWorkerID()));
        }
        this.workers.put(workerWithState.getWorkerID(), workerWithState);
        LOG.info("WorkerID: " + workerWithState.getWorkerID() + " rejoined from failure.");
        this.handleAllJoined();
        return null;
    }

    private void handleAllJoined() {
        if (!this.allJoined && this.allWorkersJoined()) {
            this.allJoined = true;
            this.informDriverForAllJoined();
            if (this.jobState == JobState.STARTING) {
                this.jobState = JobState.STARTED;
                if (this.dashClient != null) {
                    this.dashClient.jobStateChange(JobState.STARTED);
                }
            }
        }
    }

    public void completed(int workerID) {
        this.workers.get(workerID).setState(JobMasterAPI.WorkerState.COMPLETED);
        LOG.info("Worker:" + workerID + " COMPLETED.");
        if (this.dashClient != null) {
            this.dashClient.workerStateChange(workerID, JobMasterAPI.WorkerState.COMPLETED);
        }
        if (this.allWorkersCompleted()) {
            this.jobState = JobState.COMPLETED;
            LOG.info("All " + this.numberOfWorkers + " workers COMPLETED. Terminating the job.");
            this.jobMaster.completeJob(JobState.COMPLETED);
        }
    }

    public void failed(int workerID) {
        WorkerWithState failedWorker = this.workers.get(workerID);
        if (failedWorker == null) {
            LOG.warning("The worker[" + workerID + "] that hos not joined the job failed. Ignoring this event.");
            return;
        }
        failedWorker.setState(JobMasterAPI.WorkerState.FAILED);
        LOG.info("Worker: " + workerID + " FAILED.");
        if (this.dashClient != null) {
            this.dashClient.workerStateChange(workerID, JobMasterAPI.WorkerState.FAILED);
        }
        if (!this.faultTolerant) {
            this.jobState = JobState.FAILED;
            LOG.info("A worker failed in a NON-FAULT TOLERANT job. Terminating the job.");
            this.jobMaster.completeJob(JobState.FAILED);
        }
    }

    public void workersScaledDown(int instancesRemoved) {
        this.numberOfWorkers -= instancesRemoved;
        LinkedList<Integer> killedWorkers = new LinkedList<Integer>();
        String strToLog = "Deleted worker IDs by scaling down: ";
        for (int i = 0; i < instancesRemoved; ++i) {
            int killedID = this.numberOfWorkers + i;
            killedWorkers.add(killedID);
            this.workers.remove(killedID);
            this.rrServer.removeWorkerChannel(killedID);
            strToLog = strToLog + killedID + ", ";
        }
        LOG.info(strToLog);
        if (this.jobMaster.getZkMasterController() != null) {
            this.jobMaster.getZkMasterController().jobScaledDown(this.numberOfWorkers);
        } else if (this.jobMaster.getWorkerHandler() != null) {
            this.jobMaster.getWorkerHandler().workersScaledDown(instancesRemoved);
        }
        int change = 0 - instancesRemoved;
        if (this.dashClient != null) {
            this.dashClient.scaledWorkers(change, this.numberOfWorkers, killedWorkers);
        }
    }

    public void workersScaledUp(int instancesAdded) {
        this.allJoined = false;
        this.publishedAllJoinedToDriver = false;
        this.numberOfWorkers += instancesAdded;
        if (this.jobMaster.getZkMasterController() != null) {
            this.jobMaster.getZkMasterController().jobScaledUp(this.numberOfWorkers);
        }
        if (this.jobMaster.getWorkerHandler() != null) {
            this.jobMaster.getWorkerHandler().workersScaledUp(instancesAdded);
        }
        if (this.allWorkersJoined()) {
            this.allJoined = true;
            if (this.jobMaster.getZkMasterController() != null) {
                this.jobMaster.getZkMasterController().publishAllJoined();
            } else {
                this.jobMaster.getWorkerHandler().sendWorkersJoinedMessage();
            }
            this.informDriverForAllJoined();
        }
        if (this.dashClient != null) {
            this.dashClient.scaledWorkers(instancesAdded, this.numberOfWorkers, new LinkedList<Integer>());
        }
    }

    public boolean addJoinedWorkers(List<WorkerWithState> joinedWorkers) {
        for (WorkerWithState wws : joinedWorkers) {
            this.workers.put(wws.getWorkerID(), wws);
        }
        if (this.workers.size() == this.numberOfWorkers && this.allWorkersJoined()) {
            this.allJoined = true;
            this.jobState = JobState.STARTED;
            LOG.info("All workers have already joined, before the job master restarted.");
            return true;
        }
        return false;
    }

    public void informDriverForAllJoined() {
        if (this.allJoined && this.driver != null && !this.publishedAllJoinedToDriver && this.jobMaster.getWorkerHandler().isAllConnected()) {
            this.driver.allWorkersJoined(this.getWorkerInfoList());
            this.publishedAllJoinedToDriver = true;
        }
    }

    public boolean broadcastMessage(Message message) {
        JobMasterAPI.DriverMessage driverMessage = JobMasterAPI.DriverMessage.newBuilder().setData(Any.pack((Message)message).toByteString()).build();
        if (!this.allWorkersRunning()) {
            LOG.warning("Could not send the broadcast message to all workers, since not all are currenty running.");
            return false;
        }
        Iterator iterator = this.workers.keySet().iterator();
        while (iterator.hasNext()) {
            int workerID = (Integer)iterator.next();
            boolean queued = this.rrServer.sendMessage((Message)driverMessage, workerID);
            if (queued) continue;
            LOG.warning("Broadcast message can not be sent to workerID: " + workerID);
            return false;
        }
        return true;
    }

    public boolean sendMessageToWorkerList(Message message, List<Integer> workerList) {
        JobMasterAPI.DriverMessage driverMessage = JobMasterAPI.DriverMessage.newBuilder().setData(Any.pack((Message)message).toByteString()).build();
        for (int workerID : workerList) {
            WorkerWithState worker = this.workers.get(workerID);
            if (worker == null) {
                LOG.warning("There is no worker in JobMaster with workerID: " + workerID);
                return false;
            }
            if (worker.getState() == JobMasterAPI.WorkerState.RESTARTED || worker.getState() == JobMasterAPI.WorkerState.STARTED) continue;
            LOG.warning("Can not deliver the message to workerID[" + workerID + "]. Its status: " + worker.getState());
            return false;
        }
        for (int workerID : workerList) {
            boolean queued = this.rrServer.sendMessage((Message)driverMessage, workerID);
            if (queued) continue;
            LOG.warning("Message can not be sent to workerID: " + workerID + " It is not sending the message to remaining workers in the list.");
            return false;
        }
        return true;
    }

    private void workerMessageReceived(RequestID id, JobMasterAPI.WorkerMessage workerMessage) {
        if (this.driver != null) {
            try {
                Any any = Any.parseFrom((ByteString)workerMessage.getData());
                this.driver.workerMessageReceived(any, workerMessage.getWorkerID());
            }
            catch (InvalidProtocolBufferException e) {
                LOG.log(Level.SEVERE, "Can not parse received protocol buffer message to Any", e);
                JobMasterAPI.WorkerMessageResponse failResponse = JobMasterAPI.WorkerMessageResponse.newBuilder().setSucceeded(false).setReason("Can not parse received protocol buffer message to Any").build();
                this.rrServer.sendResponse(id, (Message)failResponse);
                LOG.warning("WorkerMessageResponse sent to the driver: \n" + failResponse);
                return;
            }
        }
        JobMasterAPI.WorkerMessageResponse successResponse = JobMasterAPI.WorkerMessageResponse.newBuilder().setSucceeded(true).build();
        this.rrServer.sendResponse(id, (Message)successResponse);
        LOG.fine("WorkerMessageResponse sent to the driver: \n" + successResponse);
    }

    public boolean allWorkersJoined() {
        if (this.workers.size() != this.numberOfWorkers) {
            return false;
        }
        if (this.workers.lastKey() != this.numberOfWorkers - 1) {
            return false;
        }
        for (WorkerWithState worker : this.workers.values()) {
            if (worker.startedOrCompleted()) continue;
            return false;
        }
        return true;
    }

    private boolean allWorkersRunning() {
        if (this.workers.size() != this.numberOfWorkers) {
            return false;
        }
        if (this.workers.lastKey() != this.numberOfWorkers - 1) {
            return false;
        }
        for (WorkerWithState worker : this.workers.values()) {
            if (worker.running()) continue;
            return false;
        }
        return true;
    }

    private boolean allWorkersCompleted() {
        if (this.numberOfWorkers != this.workers.size()) {
            return false;
        }
        for (WorkerWithState worker : this.workers.values()) {
            if (worker.completed()) continue;
            return false;
        }
        return true;
    }
}

