/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.server;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.common.zk.WorkerWithState;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKEphemStateManager;
import edu.iu.dsc.tws.common.zk.ZKEventsManager;
import edu.iu.dsc.tws.common.zk.ZKPersStateManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.server.WorkerMonitor;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.io.Closeable;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.utils.CloseableUtils;

public class ZKMasterController {
    public static final Logger LOG = Logger.getLogger(ZKMasterController.class.getName());
    protected int numberOfWorkers;
    protected String jobName;
    protected Config config;
    protected String rootPath;
    protected String persDir;
    protected String ephemDir;
    protected String barrierDir;
    private String jmAddress;
    protected CuratorFramework client;
    protected PathChildrenCache ephemChildrenCache;
    protected PathChildrenCache persChildrenCache;
    protected PathChildrenCache barrierChildrenCache;
    private PersistentNode masterEphemZNode;
    private List<Integer> scaledDownWorkers = new LinkedList<Integer>();
    private WorkerMonitor workerMonitor;

    public ZKMasterController(Config config, String jobName, int numberOfWorkers, String jmAddress, WorkerMonitor workerMonitor) {
        this.config = config;
        this.jobName = jobName;
        this.numberOfWorkers = numberOfWorkers;
        this.jmAddress = jmAddress;
        this.workerMonitor = workerMonitor;
        this.rootPath = ZKContext.rootNode((Config)config);
        this.persDir = ZKUtils.persDir((String)this.rootPath, (String)jobName);
        this.ephemDir = ZKUtils.ephemDir((String)this.rootPath, (String)jobName);
        this.barrierDir = ZKUtils.barrierDir((String)this.rootPath, (String)jobName);
    }

    public void initialize(JobMasterAPI.JobMasterState initialState) throws Twister2Exception {
        if (initialState != JobMasterAPI.JobMasterState.JM_STARTED && initialState != JobMasterAPI.JobMasterState.JM_RESTARTED) {
            throw new Twister2Exception("initialState has to be either JobMasterState.JM_STARTED or JobMasterState.JM_RESTARTED. Supplied value: " + initialState);
        }
        try {
            String zkServerAddresses = ZKContext.serverAddresses((Config)this.config);
            int sessionTimeoutMs = FaultToleranceContext.sessionTimeout((Config)this.config);
            this.client = ZKUtils.connectToServer((String)zkServerAddresses, (int)sessionTimeoutMs);
            if (initialState == JobMasterAPI.JobMasterState.JM_RESTARTED) {
                this.initRestarting();
            } else {
                this.ephemChildrenCache = new PathChildrenCache(this.client, this.ephemDir, true);
                this.addEphemChildrenCacheListener(this.ephemChildrenCache);
                this.ephemChildrenCache.start();
                this.persChildrenCache = new PathChildrenCache(this.client, this.persDir, true);
                this.addPersChildrenCacheListener(this.persChildrenCache);
                this.persChildrenCache.start();
            }
            this.barrierChildrenCache = new PathChildrenCache(this.client, this.barrierDir, true);
            this.addBarrierChildrenCacheListener(this.barrierChildrenCache);
            this.barrierChildrenCache.start();
            LOG.info("Job Master: " + this.jmAddress + " initialized successfully.");
        }
        catch (Twister2Exception e) {
            throw e;
        }
        catch (Exception e) {
            throw new Twister2Exception("Exception when initializing ZKMasterController.", (Throwable)e);
        }
    }

    private void initRestarting() throws Exception {
        LOG.info("Job Master restarting .... ");
        this.ephemChildrenCache = new PathChildrenCache(this.client, this.ephemDir, true);
        this.addEphemChildrenCacheListener(this.ephemChildrenCache);
        this.ephemChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        List joinedWorkerZnodes = this.ephemChildrenCache.getCurrentData();
        LOG.info("Initially existing workers: " + joinedWorkerZnodes.size());
        this.persChildrenCache = new PathChildrenCache(this.client, this.persDir, true);
        this.addPersChildrenCacheListener(this.persChildrenCache);
        this.persChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        LinkedList<WorkerWithState> joinedWorkers = new LinkedList<WorkerWithState>();
        for (ChildData child : joinedWorkerZnodes) {
            String fullPath = child.getPath();
            int workerID = ZKUtils.getWorkerIDFromEphemPath((String)fullPath);
            WorkerWithState workerWithState = this.getWorkerWithState(workerID);
            if (workerWithState != null) {
                joinedWorkers.add(workerWithState);
                continue;
            }
            LOG.severe("worker[" + fullPath + "] added, but its data can not be retrieved.");
        }
        this.publishJobMasterRestarted();
        boolean allJoined = this.workerMonitor.addJoinedWorkers(joinedWorkers);
        if (allJoined && !this.allJoinedPublished()) {
            LOG.info("Publishing AllJoined event when restarting, since it is not previously published.");
            this.publishAllJoined();
        }
    }

    private boolean allJoinedPublished() throws Exception {
        TreeMap events = ZKEventsManager.getAllEvents((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName);
        for (JobMasterAPI.JobEvent event : events.values()) {
            if (!event.hasAllJoined()) continue;
            return event.getAllJoined().getNumberOfWorkers() == this.numberOfWorkers;
        }
        return false;
    }

    public void jobScaledUp(int newNumberOfWorkers) {
        this.numberOfWorkers = newNumberOfWorkers;
    }

    public void jobScaledDown(int newNumberOfWorkers) {
        this.scaledDownWorkers = new LinkedList<Integer>();
        for (int i = newNumberOfWorkers; i < this.numberOfWorkers; ++i) {
            this.scaledDownWorkers.add(i);
        }
        this.numberOfWorkers = newNumberOfWorkers;
    }

    private void createJMEphemZnode(JobMasterAPI.JobMasterState initialState) {
        String jmPath = ZKUtils.jmEphemPath((String)this.rootPath, (String)this.jobName);
        byte[] jmZnodeBody = ZKUtils.encodeJobMasterZnode((String)this.jmAddress, (int)initialState.getNumber());
        this.masterEphemZNode = ZKUtils.createPersistentEphemeralZnode((String)jmPath, (byte[])jmZnodeBody);
        this.masterEphemZNode.start();
        try {
            this.masterEphemZNode.waitForInitialCreate(10000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.log(Level.SEVERE, "Could not create job master znode.", e);
            throw new RuntimeException("Could not create job master znode", e);
        }
        String fullPath = this.masterEphemZNode.getActualPath();
        LOG.info("An ephemeral znode is created for the job master: " + fullPath);
    }

    private void addEphemChildrenCacheListener(PathChildrenCache cache) {
        PathChildrenCacheListener listener = new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework clientOfEvent, PathChildrenCacheEvent event) {
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        ZKMasterController.this.workerZnodeAdded(event);
                        break;
                    }
                    case CHILD_REMOVED: {
                        ZKMasterController.this.workerZnodeRemoved(event);
                        break;
                    }
                }
            }
        };
        cache.getListenable().addListener((Object)listener);
    }

    private void addPersChildrenCacheListener(PathChildrenCache cache) {
        PathChildrenCacheListener listener = new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework clientOfEvent, PathChildrenCacheEvent event) {
                switch (event.getType()) {
                    case CHILD_UPDATED: {
                        ZKMasterController.this.childZnodeUpdated(event);
                        break;
                    }
                }
            }
        };
        cache.getListenable().addListener((Object)listener);
    }

    private void addBarrierChildrenCacheListener(PathChildrenCache cache) {
        PathChildrenCacheListener listener = new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework clientOfEvent, PathChildrenCacheEvent event) {
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        ZKMasterController.this.barrierZnodeAdded(event);
                        break;
                    }
                }
            }
        };
        cache.getListenable().addListener((Object)listener);
    }

    private void workerZnodeAdded(PathChildrenCacheEvent event) {
        boolean initialAllJoined = this.workerMonitor.isAllJoined();
        String addedChildPath = event.getData().getPath();
        int workerID = ZKUtils.getWorkerIDFromEphemPath((String)addedChildPath);
        WorkerWithState workerWithState = this.getWorkerWithState(workerID);
        if (workerWithState == null) {
            LOG.severe("worker[" + workerID + "] added, but its data can not be retrieved.");
            return;
        }
        if (workerWithState.getState() == JobMasterAPI.WorkerState.RESTARTED) {
            this.workerMonitor.restarted(workerWithState);
            this.publishWorkerRestarted(workerWithState);
        } else if (workerWithState.getState() == JobMasterAPI.WorkerState.STARTED) {
            this.workerMonitor.started(workerWithState);
        } else {
            LOG.warning("Following worker joined with initial state of " + workerWithState.getState() + "Something must be wrong. Ignoring this event. WorkerInfo: " + workerWithState.getInfo());
            return;
        }
        if (!initialAllJoined && this.workerMonitor.isAllJoined()) {
            this.publishAllJoined();
        }
    }

    private WorkerWithState getWorkerWithState(int workerID) {
        String workerPersPath = ZKUtils.workerPath((String)this.persDir, (int)workerID);
        ChildData znodeBody = this.persChildrenCache.getCurrentData(workerPersPath);
        if (znodeBody != null) {
            return WorkerWithState.decode((byte[])znodeBody.getData());
        }
        try {
            return ZKPersStateManager.getWorkerWithState((CuratorFramework)this.client, (String)workerPersPath);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
            return null;
        }
    }

    public void publishWorkerRestarted(WorkerWithState workerWithState) {
        JobMasterAPI.WorkerRestarted workerRestarted = JobMasterAPI.WorkerRestarted.newBuilder().setWorkerInfo(workerWithState.getInfo()).build();
        JobMasterAPI.JobEvent jobEvent = JobMasterAPI.JobEvent.newBuilder().setRestarted(workerRestarted).build();
        try {
            ZKEventsManager.publishEvent((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (JobMasterAPI.JobEvent)jobEvent);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void publishWorkerFailed(int failedID) {
        JobMasterAPI.WorkerFailed workerFailed = JobMasterAPI.WorkerFailed.newBuilder().setWorkerID(failedID).build();
        JobMasterAPI.JobEvent jobEvent = JobMasterAPI.JobEvent.newBuilder().setFailed(workerFailed).build();
        try {
            ZKEventsManager.publishEvent((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (JobMasterAPI.JobEvent)jobEvent);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void publishAllJoined() {
        List<JobMasterAPI.WorkerInfo> workers = this.workerMonitor.getWorkerInfoList();
        JobMasterAPI.AllWorkersJoined allWorkersJoined = JobMasterAPI.AllWorkersJoined.newBuilder().addAllWorkerInfo(workers).setNumberOfWorkers(workers.size()).build();
        JobMasterAPI.JobEvent jobEvent = JobMasterAPI.JobEvent.newBuilder().setAllJoined(allWorkersJoined).build();
        try {
            ZKEventsManager.publishEvent((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (JobMasterAPI.JobEvent)jobEvent);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void publishJobMasterRestarted() {
        JobMasterAPI.JobMasterRestarted jmRestarted = JobMasterAPI.JobMasterRestarted.newBuilder().setNumberOfWorkers(this.numberOfWorkers).setJmAddress(this.jmAddress).build();
        JobMasterAPI.JobEvent jobEvent = JobMasterAPI.JobEvent.newBuilder().setJmRestarted(jmRestarted).build();
        try {
            ZKEventsManager.publishEvent((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (JobMasterAPI.JobEvent)jobEvent);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    private void workerZnodeRemoved(PathChildrenCacheEvent event) {
        WorkerWithState workerWithState;
        String workerPath = event.getData().getPath();
        int removedWorkerID = ZKUtils.getWorkerIDFromEphemPath((String)workerPath);
        if (this.scaledDownWorkers.contains(removedWorkerID)) {
            this.scaledDownWorkers.remove((Object)removedWorkerID);
            LOG.info("Removed scaled down worker: " + removedWorkerID);
            return;
        }
        try {
            workerWithState = ZKPersStateManager.getWorkerWithState((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (int)removedWorkerID);
            if (workerWithState == null) {
                LOG.severe("worker[" + removedWorkerID + "] removed, but its data can not be retrieved.");
                return;
            }
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, "worker[" + removedWorkerID + "] removed, but its data can not be retrieved.", e);
            return;
        }
        String workerBodyText = ZKEphemStateManager.decodeWorkerZnodeBody((byte[])event.getData().getData());
        if (workerWithState.getState() == JobMasterAPI.WorkerState.COMPLETED) {
            return;
        }
        if ("DELETED_BY_RESTARTING_WORKER".equals(workerBodyText)) {
            LOG.info("Restarting worker deleted znode from previous run: " + workerPath);
            return;
        }
        LOG.info(String.format("Worker[%s] FAILED. Worker last status: %s", removedWorkerID, workerWithState.getState()));
        this.workerMonitor.failed(removedWorkerID);
        try {
            ZKPersStateManager.updateWorkerStatus((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (JobMasterAPI.WorkerInfo)workerWithState.getInfo(), (JobMasterAPI.WorkerState)JobMasterAPI.WorkerState.FAILED);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
        this.publishWorkerFailed(workerWithState.getWorkerID());
    }

    private void childZnodeUpdated(PathChildrenCacheEvent event) {
        String childPath = event.getData().getPath();
        int workerID = ZKUtils.getWorkerIDFromPersPath((String)childPath);
        WorkerWithState workerWithState = WorkerWithState.decode((byte[])event.getData().getData());
        LOG.fine(String.format("Worker[%s] status changed to: %s ", workerID, workerWithState.getState()));
        if (workerWithState.getState() == JobMasterAPI.WorkerState.COMPLETED) {
            this.workerMonitor.completed(workerID);
        }
    }

    private void barrierZnodeAdded(PathChildrenCacheEvent event) {
        if (this.barrierChildrenCache.getCurrentData().size() == this.numberOfWorkers) {
            JobMasterAPI.AllArrivedOnBarrier allArrived = JobMasterAPI.AllArrivedOnBarrier.newBuilder().setNumberOfWorkers(this.numberOfWorkers).build();
            JobMasterAPI.JobEvent jobEvent = JobMasterAPI.JobEvent.newBuilder().setAllArrived(allArrived).build();
            try {
                ZKEventsManager.publishEvent((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobName, (JobMasterAPI.JobEvent)jobEvent);
            }
            catch (Twister2Exception e) {
                LOG.log(Level.SEVERE, e.getMessage(), e);
            }
        }
    }

    public void close() {
        CloseableUtils.closeQuietly((Closeable)this.ephemChildrenCache);
        CloseableUtils.closeQuietly((Closeable)this.persChildrenCache);
        CloseableUtils.closeQuietly((Closeable)this.barrierChildrenCache);
        if (this.masterEphemZNode != null) {
            CloseableUtils.closeQuietly((Closeable)this.masterEphemZNode);
        }
    }
}

