/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.barrier;

import com.google.common.primitives.Longs;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.common.zk.ZKBarrierManager;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKEventsManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.barrier.BarrierMonitor;
import edu.iu.dsc.tws.master.barrier.BarrierResponder;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.CloseableUtils;

public class ZKBarrierHandler
implements BarrierResponder {
    private static final Logger LOG = Logger.getLogger(ZKBarrierHandler.class.getName());
    private BarrierMonitor barrierMonitor;
    private Config config;
    private String jobID;
    private int numberOfWorkers;
    private CuratorFramework client;
    private PathChildrenCache defaultBarrierCache;
    private PathChildrenCache initBarrierCache;
    private String rootPath;

    public ZKBarrierHandler(BarrierMonitor barrierMonitor, Config config, String jobID, int numberOfWorkers) {
        this.barrierMonitor = barrierMonitor;
        this.config = config;
        this.jobID = jobID;
        this.numberOfWorkers = numberOfWorkers;
        this.rootPath = ZKContext.rootNode((Config)config);
    }

    public void initialize(JobMasterAPI.JobMasterState initialState) throws Twister2Exception {
        if (initialState != JobMasterAPI.JobMasterState.JM_STARTED && initialState != JobMasterAPI.JobMasterState.JM_RESTARTED) {
            throw new Twister2Exception("initialState has to be either JobMasterState.JM_STARTED or JobMasterState.JM_RESTARTED. Supplied value: " + initialState);
        }
        try {
            String zkServerAddresses = ZKContext.serverAddresses((Config)this.config);
            int sessionTimeoutMs = FaultToleranceContext.sessionTimeout((Config)this.config);
            this.client = ZKUtils.connectToServer((String)zkServerAddresses, (int)sessionTimeoutMs);
            if (initialState == JobMasterAPI.JobMasterState.JM_RESTARTED) {
                String defaultBarrierDir = ZKUtils.defaultBarrierDir((String)this.rootPath, (String)this.jobID);
                this.defaultBarrierCache = new PathChildrenCache(this.client, defaultBarrierDir, true);
                this.addBarrierChildrenCacheListener(this.defaultBarrierCache, JobMasterAPI.BarrierType.DEFAULT);
                this.defaultBarrierCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                TreeSet<Integer> existingWorkers = new TreeSet<Integer>();
                long timeout = this.getInitialWorkersAtBarrier(this.defaultBarrierCache, existingWorkers);
                if (!existingWorkers.isEmpty()) {
                    this.barrierMonitor.initDefaultAfterRestart(existingWorkers, timeout, this.numberOfWorkers);
                    LOG.info("Existing workers at default barrier: " + existingWorkers.size());
                    existingWorkers.clear();
                }
                String initBarrierDir = ZKUtils.initBarrierDir((String)this.rootPath, (String)this.jobID);
                this.initBarrierCache = new PathChildrenCache(this.client, initBarrierDir, true);
                this.addBarrierChildrenCacheListener(this.initBarrierCache, JobMasterAPI.BarrierType.INIT);
                this.initBarrierCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                timeout = this.getInitialWorkersAtBarrier(this.initBarrierCache, existingWorkers);
                if (!existingWorkers.isEmpty()) {
                    this.barrierMonitor.initInitAfterRestart(existingWorkers, timeout, this.numberOfWorkers);
                    LOG.info("Existing workers at init barrier: " + existingWorkers);
                }
            } else {
                String defaultBarrierDir = ZKUtils.defaultBarrierDir((String)this.rootPath, (String)this.jobID);
                this.defaultBarrierCache = new PathChildrenCache(this.client, defaultBarrierDir, true);
                this.addBarrierChildrenCacheListener(this.defaultBarrierCache, JobMasterAPI.BarrierType.DEFAULT);
                this.defaultBarrierCache.start();
                String initBarrierDir = ZKUtils.initBarrierDir((String)this.rootPath, (String)this.jobID);
                this.initBarrierCache = new PathChildrenCache(this.client, initBarrierDir, true);
                this.addBarrierChildrenCacheListener(this.initBarrierCache, JobMasterAPI.BarrierType.INIT);
                this.initBarrierCache.start();
            }
        }
        catch (Twister2Exception e) {
            throw e;
        }
        catch (Exception e) {
            throw new Twister2Exception("Exception when initializing ZKMasterController.", (Throwable)e);
        }
    }

    private long getInitialWorkersAtBarrier(PathChildrenCache childrenCache, Set<Integer> workersAtBarrier) {
        long timeout = 0L;
        List existingWorkerZnodes = childrenCache.getCurrentData();
        for (ChildData child : existingWorkerZnodes) {
            String fullPath = child.getPath();
            int workerID = ZKUtils.getWorkerIDFromPersPath((String)fullPath);
            workersAtBarrier.add(workerID);
            if (timeout != 0L) continue;
            try {
                ZKBarrierManager.readWorkerTimeout((CuratorFramework)this.client, (String)fullPath);
            }
            catch (Twister2Exception e) {
                throw new Twister2RuntimeException((Throwable)e);
            }
        }
        return timeout;
    }

    private void addBarrierChildrenCacheListener(PathChildrenCache cache, final JobMasterAPI.BarrierType barrierType) {
        PathChildrenCacheListener listener = new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework clientOfEvent, PathChildrenCacheEvent event) {
                String childPath = event.getData().getPath();
                int workerID = ZKUtils.getWorkerIDFromPersPath((String)childPath);
                long timeout = Longs.fromByteArray((byte[])event.getData().getData());
                switch (event.getType()) {
                    case CHILD_ADDED: {
                        if (barrierType == JobMasterAPI.BarrierType.DEFAULT) {
                            ZKBarrierHandler.this.barrierMonitor.arrivedAtDefault(workerID, timeout);
                            break;
                        }
                        if (barrierType != JobMasterAPI.BarrierType.INIT) break;
                        ZKBarrierHandler.this.barrierMonitor.arrivedAtInit(workerID, timeout);
                        break;
                    }
                    case CHILD_REMOVED: {
                        if (barrierType == JobMasterAPI.BarrierType.DEFAULT) {
                            ZKBarrierHandler.this.barrierMonitor.removedFromDefault(workerID);
                            break;
                        }
                        if (barrierType != JobMasterAPI.BarrierType.INIT) break;
                        ZKBarrierHandler.this.barrierMonitor.removedFromInit(workerID);
                        break;
                    }
                }
            }
        };
        cache.getListenable().addListener((Object)listener);
    }

    @Override
    public void allArrived(JobMasterAPI.BarrierType barrierType) {
        this.publishBarrierDone(barrierType, JobMasterAPI.BarrierResult.SUCCESS);
    }

    @Override
    public void barrierFailed(JobMasterAPI.BarrierType barrierType, JobMasterAPI.BarrierResult result) {
        this.publishBarrierDone(barrierType, result);
    }

    public void publishBarrierDone(JobMasterAPI.BarrierType barrierType, JobMasterAPI.BarrierResult barrierResult) {
        JobMasterAPI.BarrierDone barrierDone = JobMasterAPI.BarrierDone.newBuilder().setBarrierType(barrierType).setResult(barrierResult).build();
        JobMasterAPI.JobEvent jobEvent = JobMasterAPI.JobEvent.newBuilder().setBarrierDone(barrierDone).build();
        try {
            ZKEventsManager.publishEvent((CuratorFramework)this.client, (String)this.rootPath, (String)this.jobID, (JobMasterAPI.JobEvent)jobEvent);
        }
        catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void close() {
        CloseableUtils.closeQuietly((Closeable)this.defaultBarrierCache);
        CloseableUtils.closeQuietly((Closeable)this.initBarrierCache);
    }
}

