/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.bootstrap;

import com.google.protobuf.InvalidProtocolBufferException;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.ControllerContext;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.proto.utils.WorkerInfoUtils;
import edu.iu.dsc.tws.rsched.bootstrap.ZKContext;
import edu.iu.dsc.tws.rsched.bootstrap.ZKUtil;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

public class ZKWorkerController
implements IWorkerController {
    public static final Logger LOG = Logger.getLogger(ZKWorkerController.class.getName());
    private String workerIP;
    private int workerPort;
    private JobMasterAPI.WorkerInfo workerInfo;
    private int numberOfWorkers;
    private String jobName;
    private JobMasterAPI.NodeInfo nodeInfo;
    private JobAPI.ComputeResource computeResource;
    private CuratorFramework client;
    private String jobPath;
    private PersistentNode jobZNode;
    private PathChildrenCache childrenCache;
    private DistributedAtomicInteger daiForWorkerID;
    private DistributedAtomicInteger daiForBarrier;
    private DistributedBarrier barrier;
    private Config config;

    public ZKWorkerController(Config config, String jobName, String workerIpAndPort, int numberOfWorkers, JobMasterAPI.NodeInfo nodeInfo, JobAPI.ComputeResource computeResource) {
        this.config = config;
        this.jobName = jobName;
        this.numberOfWorkers = numberOfWorkers;
        this.nodeInfo = nodeInfo;
        this.jobPath = ZKUtil.constructJobPath(config, jobName);
        this.computeResource = computeResource;
        String[] fields = workerIpAndPort.split(":");
        this.workerIP = fields[0];
        this.workerPort = Integer.parseInt(fields[1]);
    }

    public boolean initialize() {
        try {
            String zkServerAddresses = ZKContext.zooKeeperServerAddresses(this.config);
            this.client = CuratorFrameworkFactory.newClient((String)zkServerAddresses, (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
            this.client.start();
            String barrierPath = ZKUtil.constructBarrierPath(this.config, this.jobName);
            this.barrier = new DistributedBarrier(this.client, barrierPath);
            String daiPathForWorkerID = ZKUtil.constructDaiPathForWorkerID(this.config, this.jobName);
            this.daiForWorkerID = new DistributedAtomicInteger(this.client, daiPathForWorkerID, (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
            String daiPathForBarrier = ZKUtil.constructDaiPathForBarrier(this.config, this.jobName);
            this.daiForBarrier = new DistributedAtomicInteger(this.client, daiPathForBarrier, (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
            if (this.client.checkExists().forPath(this.jobPath) == null) {
                int workerID = this.createWorkerID();
                this.workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)this.workerIP, (int)this.workerPort, (JobMasterAPI.NodeInfo)this.nodeInfo, (JobAPI.ComputeResource)this.computeResource);
                this.createWorkerZnode();
                this.appendWorkerInfo();
            } else {
                List<JobMasterAPI.WorkerInfo> workers = this.parseJobZNode();
                this.workerInfo = this.getIfExists(workers);
                if (this.workerInfo != null) {
                    this.createWorkerZnode();
                    LOG.warning("Worker is coming from a failure. It is using the previous job znode data: " + this.workerInfo);
                } else {
                    int workerID = this.createWorkerID();
                    this.workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)this.workerIP, (int)this.workerPort, (JobMasterAPI.NodeInfo)this.nodeInfo, (JobAPI.ComputeResource)this.computeResource);
                    this.createWorkerZnode();
                    this.appendWorkerInfo();
                }
            }
            this.childrenCache = new PathChildrenCache(this.client, this.jobPath, true);
            this.childrenCache.start();
            LOG.info("This worker: " + this.workerInfo + " initialized successfully.");
            return true;
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "Exception when initializing ZKWorkerController", e);
            return false;
        }
    }

    public boolean initialize(int workerID) {
        try {
            String zkServerAddresses = ZKContext.zooKeeperServerAddresses(this.config);
            this.client = CuratorFrameworkFactory.newClient((String)zkServerAddresses, (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
            this.client.start();
            String barrierPath = ZKUtil.constructBarrierPath(this.config, this.jobName);
            this.barrier = new DistributedBarrier(this.client, barrierPath);
            String daiPathForWorkerID = ZKUtil.constructDaiPathForWorkerID(this.config, this.jobName);
            this.daiForWorkerID = new DistributedAtomicInteger(this.client, daiPathForWorkerID, (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
            String daiPathForBarrier = ZKUtil.constructDaiPathForBarrier(this.config, this.jobName);
            this.daiForBarrier = new DistributedAtomicInteger(this.client, daiPathForBarrier, (RetryPolicy)new ExponentialBackoffRetry(1000, 3));
            if (this.client.checkExists().forPath(this.jobPath) == null) {
                this.workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)this.workerIP, (int)this.workerPort, (JobMasterAPI.NodeInfo)this.nodeInfo, (JobAPI.ComputeResource)this.computeResource);
                this.createWorkerZnode();
                this.appendWorkerInfo();
            } else {
                List<JobMasterAPI.WorkerInfo> workers = this.parseJobZNode();
                this.workerInfo = this.getIfExists(workers);
                if (this.workerInfo != null) {
                    this.createWorkerZnode();
                    LOG.warning("Worker is coming from a failure. It is using the previous job znode data: " + this.workerInfo);
                } else {
                    this.workerInfo = WorkerInfoUtils.createWorkerInfo((int)workerID, (String)this.workerIP, (int)this.workerPort, (JobMasterAPI.NodeInfo)this.nodeInfo, (JobAPI.ComputeResource)this.computeResource);
                    this.createWorkerZnode();
                    this.appendWorkerInfo();
                }
            }
            this.childrenCache = new PathChildrenCache(this.client, this.jobPath, true);
            this.childrenCache.start();
            LOG.info("This worker: " + this.workerInfo + " initialized successfully.");
            return true;
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "Exception when initializing ZKWorkerController", e);
            return false;
        }
    }

    public JobMasterAPI.WorkerInfo getWorkerInfo() {
        return this.workerInfo;
    }

    public JobMasterAPI.WorkerInfo getWorkerInfoForID(int id) {
        List<JobMasterAPI.WorkerInfo> workerList = this.getJoinedWorkers();
        for (JobMasterAPI.WorkerInfo info : workerList) {
            if (info.getWorkerID() != id) continue;
            return info;
        }
        return null;
    }

    public int getNumberOfWorkers() {
        return this.numberOfWorkers;
    }

    private int createWorkerID() {
        try {
            AtomicValue incremented = this.daiForWorkerID.increment();
            if (incremented.succeeded()) {
                int workerID = (Integer)incremented.preValue();
                LOG.fine("Unique WorkerID generated: " + workerID);
                return workerID;
            }
            this.createWorkerID();
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to generate a unique workerID. Will try again ...", e);
            this.createWorkerID();
        }
        return -1;
    }

    private void createWorkerZnode() {
        try {
            String thisNodePath = ZKUtil.constructWorkerPath(this.jobPath, this.getWorkerIpAndPort(this.workerInfo));
            this.jobZNode = ZKUtil.createPersistentEphemeralZnode(this.client, thisNodePath, this.workerInfo.toByteArray());
            this.jobZNode.start();
            this.jobZNode.waitForInitialCreate(10000L, TimeUnit.MILLISECONDS);
            String fullZnodePath = this.jobZNode.getActualPath();
            LOG.fine("An ephemeral znode is created for this worker: " + fullZnodePath);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create znode for the worker: " + this.workerInfo, e);
        }
    }

    private void appendWorkerInfo() {
        String lockPath = ZKUtil.constructJobLockPath(this.config, this.jobName);
        InterProcessMutex lock = new InterProcessMutex(this.client, lockPath);
        try {
            lock.acquire();
            byte[] parentData = (byte[])this.client.getData().forPath(this.jobPath);
            byte[] encodedWorkerInfoBytes = ZKUtil.encodeWorkerInfo(this.workerInfo);
            byte[] allBytes = ZKUtil.addTwoByteArrays(parentData, encodedWorkerInfoBytes);
            this.client.setData().forPath(this.jobPath, allBytes);
            lock.release();
            LOG.info("Added own WorkerInfo and updated job znode content.");
        }
        catch (Exception e) {
            throw new RuntimeException("Could not update the job znode content for the worker: " + this.workerInfo, e);
        }
    }

    public void printWorkers(List<JobMasterAPI.WorkerInfo> workers) {
        StringBuffer logBuffer = new StringBuffer();
        logBuffer.append("Number of workers in the job: " + workers.size() + "\n");
        for (JobMasterAPI.WorkerInfo worker : workers) {
            logBuffer.append(worker.toString() + "\n");
        }
        LOG.info(logBuffer.toString());
    }

    public List<JobMasterAPI.WorkerInfo> getCurrentWorkers() {
        ArrayList<JobMasterAPI.WorkerInfo> workers = new ArrayList<JobMasterAPI.WorkerInfo>();
        for (ChildData child : this.childrenCache.getCurrentData()) {
            JobMasterAPI.WorkerInfo wnInfo = null;
            try {
                wnInfo = ((JobMasterAPI.WorkerInfo.Builder)JobMasterAPI.WorkerInfo.newBuilder().mergeFrom(child.getData())).build();
            }
            catch (InvalidProtocolBufferException e) {
                LOG.log(Level.SEVERE, "Could not decode child znode content as a WorkerInfo object", e);
            }
            workers.add(wnInfo);
        }
        return workers;
    }

    public int getNumberOfCurrentWorkers() {
        return this.childrenCache.getCurrentData().size();
    }

    public List<JobMasterAPI.WorkerInfo> getJoinedWorkers() {
        return this.parseJobZNode();
    }

    private List<JobMasterAPI.WorkerInfo> parseJobZNode() {
        try {
            byte[] jobZnodeData = (byte[])this.client.getData().forPath(this.jobPath);
            return ZKUtil.decodeWorkerInfos(jobZnodeData);
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "Could not get the job node data", e);
            return null;
        }
    }

    private JobMasterAPI.WorkerInfo getIfExists(List<JobMasterAPI.WorkerInfo> workers) {
        String workerIpAndPort = this.workerIP + ":" + this.workerPort;
        for (JobMasterAPI.WorkerInfo worker : workers) {
            if (!workerIpAndPort.equalsIgnoreCase(this.getWorkerIpAndPort(worker))) continue;
            return worker;
        }
        return null;
    }

    private String getWorkerIpAndPort(JobMasterAPI.WorkerInfo wInfo) {
        return wInfo.getWorkerIP() + ":" + wInfo.getPort();
    }

    private int countNumberOfJoinedWorkers() {
        byte[] parentData;
        try {
            parentData = (byte[])this.client.getData().forPath(this.jobPath);
        }
        catch (Exception e) {
            LOG.log(Level.SEVERE, "Could not get the job node data", e);
            return -1;
        }
        int lengthIndex = 0;
        int counter = 0;
        while (lengthIndex < parentData.length) {
            int length = ZKUtil.intFromBytes(parentData, lengthIndex);
            lengthIndex += 4 + length;
            ++counter;
        }
        return counter;
    }

    public List<JobMasterAPI.WorkerInfo> getAllWorkers() throws TimeoutException {
        long timeLimit = ControllerContext.maxWaitTimeForAllToJoin((Config)this.config);
        long duration = 0L;
        while (duration < timeLimit) {
            if (this.countNumberOfJoinedWorkers() < this.numberOfWorkers) {
                try {
                    Thread.sleep(50L);
                    duration += 50L;
                }
                catch (InterruptedException e) {
                    LOG.fine("Thread sleep interrupted. Will try again ...");
                }
                continue;
            }
            return this.getJoinedWorkers();
        }
        throw new TimeoutException("All workers have not joined the job on the specified time limit: " + timeLimit + "ms.");
    }

    private String getZnodeName(String znodeName) {
        if (znodeName == null || znodeName.length() < 40) {
            return null;
        }
        String workerName = znodeName.substring(40);
        return workerName;
    }

    private boolean incrementBarrierDAI(int tryCount, long timeLimitMilliSec) {
        if (tryCount == 100) {
            return false;
        }
        try {
            AtomicValue incremented = this.daiForBarrier.increment();
            if (incremented.succeeded()) {
                LOG.fine("DistributedAtomicInteger for Barrier increased to: " + incremented.postValue());
                if ((Integer)incremented.postValue() % this.numberOfWorkers == 0) {
                    this.barrier.removeBarrier();
                    return true;
                }
                this.barrier.setBarrier();
                return this.barrier.waitOnBarrier(timeLimitMilliSec, TimeUnit.MILLISECONDS);
            }
            return this.incrementBarrierDAI(tryCount + 1, timeLimitMilliSec);
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to increment the DistributedAtomicInteger for Barrier. Will try again ...", e);
            return this.incrementBarrierDAI(tryCount + 1, timeLimitMilliSec);
        }
    }

    public void waitOnBarrier() throws TimeoutException {
        boolean allArrived = this.incrementBarrierDAI(0, ControllerContext.maxWaitTimeOnBarrier((Config)this.config));
        if (!allArrived) {
            throw new TimeoutException("All workers have not arrived at the barrier on the time limit: " + ControllerContext.maxWaitTimeOnBarrier((Config)this.config) + "ms.");
        }
    }

    public void close() {
        if (this.client != null) {
            try {
                int noOfChildren = this.childrenCache.getCurrentData().size();
                this.jobZNode.close();
                CloseableUtils.closeQuietly((Closeable)this.childrenCache);
                if (noOfChildren == 1) {
                    LOG.log(Level.INFO, "This is the last worker to finish. Deleting the job znodes.");
                    ZKUtil.deleteJobZNodes(this.config, this.client, this.jobName);
                }
                CloseableUtils.closeQuietly((Closeable)this.client);
            }
            catch (Exception e) {
                LOG.log(Level.SEVERE, "Exception when closing", e);
            }
        }
    }
}

