package org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.collect.Lists;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.Service;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.api.Command;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.api.RunId;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.common.Threads;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.state.Message;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.state.Messages;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.state.SystemMessages;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.zookeeper.NodeData;
import org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.zookeeper.ZKClient;
import org.apache.phoenix.shaded.org.apache.zookeeper.KeeperException;
import org.apache.phoenix.shaded.org.apache.zookeeper.WatchedEvent;
import org.apache.phoenix.shaded.org.apache.zookeeper.Watcher;
import org.apache.phoenix.shaded.org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/phoenix/shaded/org/apache/tephra/shaded/org/apache/twill/internal/AbstractZKServiceController.class */
public abstract class AbstractZKServiceController extends AbstractExecutionServiceController {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractZKServiceController.class);
    protected final ZKClient zkClient;
    private final InstanceNodeDataCallback instanceNodeDataCallback;
    private final List<ListenableFuture<?>> messageFutures;
    private ListenableFuture<Service.State> stopMessageFuture;

    /* renamed from: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractZKServiceController$6, reason: invalid class name */
    /* loaded from: input_file:org/apache/phoenix/shaded/org/apache/tephra/shaded/org/apache/twill/internal/AbstractZKServiceController$6.class */
    static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType = new int[Watcher.Event.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeDataChanged.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeDeleted.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/phoenix/shaded/org/apache/tephra/shaded/org/apache/twill/internal/AbstractZKServiceController$InstanceNodeDataCallback.class */
    public final class InstanceNodeDataCallback implements FutureCallback<NodeData> {
        private InstanceNodeDataCallback() {
        }

        @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
        public void onSuccess(NodeData nodeData) {
            if (AbstractZKServiceController.this.shouldProcessZKEvent()) {
                AbstractZKServiceController.this.instanceNodeUpdated(nodeData);
            }
        }

        @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
        public void onFailure(Throwable th) {
            AbstractZKServiceController.LOG.error("Failed in fetching instance node data.", th);
            if (AbstractZKServiceController.this.shouldProcessZKEvent()) {
                AbstractZKServiceController.this.instanceNodeFailed(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractZKServiceController(RunId runId, ZKClient zKClient) {
        super(runId);
        this.zkClient = zKClient;
        this.instanceNodeDataCallback = new InstanceNodeDataCallback();
        this.messageFutures = Lists.newLinkedList();
    }

    @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.api.ServiceController
    public final ListenableFuture<Command> sendCommand(Command command) {
        return sendMessage(Messages.createForAll(command), command);
    }

    @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.api.ServiceController
    public final ListenableFuture<Command> sendCommand(String str, Command command) {
        return sendMessage(Messages.createForRunnable(str, command), command);
    }

    @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractExecutionServiceController
    protected final void startUp() {
        doStartUp();
        actOnExists(getInstancePath(), new Runnable() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractZKServiceController.1
            @Override // java.lang.Runnable
            public void run() {
                AbstractZKServiceController.this.watchInstanceNode();
            }
        });
    }

    @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractExecutionServiceController
    protected final synchronized void shutDown() {
        if (this.stopMessageFuture == null) {
            this.stopMessageFuture = ZKMessages.sendMessage(this.zkClient, getMessagePrefix(), SystemMessages.stopApplication(), Service.State.TERMINATED);
        }
        Iterator<ListenableFuture<?>> it = this.messageFutures.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        doShutDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final synchronized <V> ListenableFuture<V> sendMessage(Message message, V v) {
        if (!isRunning()) {
            return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
        }
        final ListenableFuture<V> sendMessage = ZKMessages.sendMessage(this.zkClient, getMessagePrefix(), message, v);
        this.messageFutures.add(sendMessage);
        sendMessage.addListener(new Runnable() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractZKServiceController.2
            @Override // java.lang.Runnable
            public void run() {
                if (AbstractZKServiceController.this.state() == Service.State.STOPPING) {
                    return;
                }
                synchronized (AbstractZKServiceController.this) {
                    AbstractZKServiceController.this.messageFutures.remove(sendMessage);
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        return sendMessage;
    }

    protected final ListenableFuture<Service.State> getStopMessageFuture() {
        return this.stopMessageFuture;
    }

    protected abstract void doStartUp();

    protected abstract void doShutDown();

    protected abstract void instanceNodeUpdated(NodeData nodeData);

    protected abstract void instanceNodeFailed(Throwable th);

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void forceShutDown() {
        if (this.stopMessageFuture == null) {
            this.stopMessageFuture = Futures.immediateFuture(Service.State.TERMINATED);
        }
        stop();
    }

    private void actOnExists(final String str, final Runnable runnable) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Futures.addCallback(this.zkClient.exists(str, new Watcher() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractZKServiceController.3
            @Override // org.apache.phoenix.shaded.org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (AbstractZKServiceController.this.shouldProcessZKEvent() && watchedEvent.getType() == Watcher.Event.EventType.NodeCreated && atomicBoolean.compareAndSet(false, true)) {
                    runnable.run();
                }
            }
        }), new FutureCallback<Stat>() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractZKServiceController.4
            @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Stat stat) {
                if (stat == null || !atomicBoolean.compareAndSet(false, true)) {
                    return;
                }
                runnable.run();
            }

            @Override // org.apache.phoenix.shaded.org.apache.tephra.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AbstractZKServiceController.LOG.error("Failed in exists call to {}. Shutting down service.", str, th);
                AbstractZKServiceController.this.forceShutDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    protected final void watchInstanceNode() {
        if (shouldProcessZKEvent()) {
            Futures.addCallback(this.zkClient.getData(getInstancePath(), new Watcher() { // from class: org.apache.phoenix.shaded.org.apache.tephra.shaded.org.apache.twill.internal.AbstractZKServiceController.5
                @Override // org.apache.phoenix.shaded.org.apache.zookeeper.Watcher
                public void process(WatchedEvent watchedEvent) {
                    if (AbstractZKServiceController.this.shouldProcessZKEvent()) {
                        switch (AnonymousClass6.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[watchedEvent.getType().ordinal()]) {
                            case 1:
                                AbstractZKServiceController.this.watchInstanceNode();
                                return;
                            case 2:
                                AbstractZKServiceController.this.instanceNodeFailed(KeeperException.create(KeeperException.Code.NONODE, AbstractZKServiceController.this.getInstancePath()));
                                return;
                            default:
                                AbstractZKServiceController.LOG.info("Ignore ZK event for instance node: {}", watchedEvent);
                                return;
                        }
                    }
                }
            }), this.instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldProcessZKEvent() {
        Service.State state = state();
        return state == Service.State.NEW || state == Service.State.STARTING || state == Service.State.RUNNING;
    }

    private String getMessagePrefix() {
        return getZKPath("messages/msg");
    }

    protected final String getInstancePath() {
        return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId());
    }

    private String getZKPath(String str) {
        return String.format("/%s/%s", getRunId().getId(), str);
    }
}
