/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.core.master;

import io.mantisrx.server.core.json.DefaultObjectMapper;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.ErrorListenerPathable;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;

public class ZookeeperMasterMonitor
implements MasterMonitor {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperMasterMonitor.class);
    private final CuratorFramework curator;
    private final String masterPath;
    private final BehaviorSubject<MasterDescription> masterSubject;
    private final AtomicReference<MasterDescription> latestMaster = new AtomicReference();
    private final NodeCache nodeMonitor;

    public ZookeeperMasterMonitor(CuratorFramework curator, String masterPath, MasterDescription initValue) {
        this.curator = curator;
        this.masterPath = masterPath;
        this.masterSubject = BehaviorSubject.create((Object)initValue);
        this.nodeMonitor = new NodeCache(curator, masterPath);
        this.latestMaster.set(initValue);
    }

    public void start() {
        this.nodeMonitor.getListenable().addListener((Object)new NodeCacheListener(){

            public void nodeChanged() throws Exception {
                ZookeeperMasterMonitor.this.retrieveMaster();
            }
        });
        try {
            this.nodeMonitor.start();
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to start master node monitor: " + e.getMessage(), e);
        }
        logger.info("The ZK master monitor is started");
    }

    private void retrieveMaster() {
        try {
            ((ErrorListenerPathable)this.curator.sync().inBackground(((ErrorListenerPathable)this.curator.getData().inBackground(new BackgroundCallback(){

                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    MasterDescription description = (MasterDescription)DefaultObjectMapper.getInstance().readValue(event.getData(), MasterDescription.class);
                    logger.info("New master retrieved: " + description);
                    ZookeeperMasterMonitor.this.latestMaster.set(description);
                    ZookeeperMasterMonitor.this.masterSubject.onNext((Object)description);
                }
            })).forPath(this.masterPath))).forPath(this.masterPath);
        }
        catch (Exception e) {
            logger.error("Failed to retrieve updated master information: " + e.getMessage(), (Throwable)e);
        }
    }

    @Override
    public Observable<MasterDescription> getMasterObservable() {
        return this.masterSubject;
    }

    @Override
    public MasterDescription getLatestMaster() {
        return this.latestMaster.get();
    }

    public void shutdown() {
        try {
            this.nodeMonitor.close();
            logger.info("ZK master monitor is shut down");
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to close the ZK node monitor: " + e.getMessage(), e);
        }
    }
}

