/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.agent;

import com.mantisrx.common.utils.ListenerCallQueue;
import com.mantisrx.common.utils.Services;
import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.common.JsonSerializer;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
import io.mantisrx.runtime.loader.RuntimeTask;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.TaskFactory;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.runtime.loader.config.WorkerConfigurationUtils;
import io.mantisrx.server.agent.JvmUtils;
import io.mantisrx.server.agent.MachineDefinitionUtils;
import io.mantisrx.server.agent.SingleTaskOnlyFactory;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.ResourceLeaderConnection;
import io.mantisrx.server.master.client.TaskStatusUpdateHandler;
import io.mantisrx.server.master.resourcecluster.ClusterID;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorReport;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.server.worker.TaskExecutorGateway;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractScheduledService;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
import java.beans.ConstructorProperties;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.metrics.MetricEventsListenerFactory;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.subjects.PublishSubject;

public class TaskExecutor
extends RpcEndpoint
implements TaskExecutorGateway {
    private static final Logger log = LoggerFactory.getLogger(TaskExecutor.class);
    private final TaskExecutorID taskExecutorID;
    private final ClusterID clusterID;
    private final WorkerConfiguration workerConfiguration;
    private final HighAvailabilityServices highAvailabilityServices;
    private final ClassLoaderHandle classLoaderHandle;
    private final SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory;
    private final TaskExecutorRegistration taskExecutorRegistration;
    private final CompletableFuture<Void> startFuture = new CompletableFuture();
    private final ExecutorService ioExecutor;
    private final ExecutorService runtimeTaskExecutor;
    private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue();
    private MantisMasterGateway masterMonitor;
    private ResourceLeaderConnection<ResourceClusterGateway> resourceClusterGatewaySupplier;
    private TaskStatusUpdateHandler taskStatusUpdateHandler;
    private ResourceManagerGatewayCxn currentResourceManagerCxn;
    private TaskExecutorReport currentReport;
    private RuntimeTask currentTask;
    private Subscription currentTaskStatusSubscription;
    private int resourceManagerCxnIdx;
    private Throwable previousFailure;
    private final TaskFactory taskFactory;

    public TaskExecutor(RpcService rpcService, WorkerConfiguration workerConfiguration, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory) {
        this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle, subscriptionStateHandlerFactory, null);
    }

    public TaskExecutor(RpcService rpcService, WorkerConfiguration workerConfiguration, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, SinkSubscriptionStateHandler.Factory subscriptionStateHandlerFactory, @Nullable TaskFactory taskFactory) {
        super(rpcService, RpcServiceUtils.createRandomName((String)"worker"));
        this.taskExecutorID = Optional.ofNullable(workerConfiguration.getTaskExecutorId()).map(TaskExecutorID::of).orElseGet(TaskExecutorID::generate);
        this.clusterID = ClusterID.of((String)workerConfiguration.getClusterId());
        this.workerConfiguration = workerConfiguration;
        this.highAvailabilityServices = highAvailabilityServices;
        this.classLoaderHandle = classLoaderHandle;
        this.subscriptionStateHandlerFactory = subscriptionStateHandlerFactory;
        WorkerPorts workerPorts = new WorkerPorts(workerConfiguration.getMetricsPort(), workerConfiguration.getDebugPort(), workerConfiguration.getConsolePort(), workerConfiguration.getCustomPort(), workerConfiguration.getSinkPort());
        MachineDefinition machineDefinition = MachineDefinitionUtils.sys(workerPorts, workerConfiguration.getNetworkBandwidthInMB());
        String hostName = workerConfiguration.getExternalAddress();
        this.taskExecutorRegistration = TaskExecutorRegistration.builder().machineDefinition(machineDefinition).taskExecutorID(this.taskExecutorID).clusterID(this.clusterID).hostname(hostName).taskExecutorAddress(this.getAddress()).workerPorts(workerPorts).taskExecutorAttributes((Map)ImmutableMap.copyOf(workerConfiguration.getTaskExecutorAttributes().entrySet().stream().collect(Collectors.toMap(kv -> ((String)kv.getKey()).toLowerCase(), kv -> ((String)kv.getValue()).toLowerCase())))).build();
        log.info("Starting executor registration: {}", (Object)this.taskExecutorRegistration);
        this.ioExecutor = Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("taskexecutor-io"));
        this.runtimeTaskExecutor = Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("taskexecutor-runtime"));
        this.resourceManagerCxnIdx = 0;
        this.taskFactory = taskFactory == null ? new SingleTaskOnlyFactory() : taskFactory;
    }

    protected void onStart() {
        try {
            this.startTaskExecutorServices();
            this.startFuture.complete(null);
        }
        catch (Throwable throwable) {
            log.error("Fatal error occurred in starting TaskExecutor {}", (Object)this.getAddress(), (Object)throwable);
            this.startFuture.completeExceptionally(throwable);
            throw throwable;
        }
    }

    private void startTaskExecutorServices() {
        this.validateRunsInMainThread();
        this.masterMonitor = this.highAvailabilityServices.getMasterClientApi();
        this.taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway((MantisMasterGateway)this.masterMonitor);
        RxNetty.useMetricListenersFactory((MetricEventsListenerFactory)new MantisNettyEventsListenerFactory());
        this.resourceClusterGatewaySupplier = this.highAvailabilityServices.connectWithResourceManager(this.clusterID);
        this.resourceClusterGatewaySupplier.register((ResourceLeaderConnection.ResourceLeaderChangeListener)new ResourceManagerChangeListener());
        this.establishNewResourceManagerCxnSync();
    }

    public CompletableFuture<Void> awaitRunning() {
        return this.startFuture;
    }

    private void establishNewResourceManagerCxnSync() {
        this.validateRunsInMainThread();
        Preconditions.checkArgument((this.currentResourceManagerCxn == null ? 1 : 0) != 0, (Object)String.format("resource manager connection already exists %s", new Object[]{this.currentResourceManagerCxn}));
        ResourceManagerGatewayCxn cxn = this.newResourceManagerCxn();
        this.setResourceManagerCxn(cxn);
        cxn.startAsync().awaitRunning();
    }

    private CompletableFuture<Void> establishNewResourceManagerCxnAsync() {
        this.validateRunsInMainThread();
        if (this.currentResourceManagerCxn != null) {
            return CompletableFutures.exceptionallyCompletedFuture((Throwable)new Exception(String.format("resource manager connection already exists %s", new Object[]{this.currentResourceManagerCxn})));
        }
        ResourceManagerGatewayCxn cxn = this.newResourceManagerCxn();
        this.setResourceManagerCxn(cxn);
        return Services.startAsync((Service)cxn, (Executor)this.getIOExecutor()).handleAsync((dontCare, throwable) -> {
            if (throwable != null) {
                log.error("Failed to create a connection; Retrying", throwable);
                if (this.currentResourceManagerCxn == cxn) {
                    this.currentResourceManagerCxn = null;
                    this.scheduleRunAsync(this::establishNewResourceManagerCxnAsync, this.workerConfiguration.heartbeatInternalInMs(), TimeUnit.MILLISECONDS);
                }
                return null;
            }
            return dontCare;
        }, (Executor)this.getMainThreadExecutor());
    }

    private CompletableFuture<Void> reestablishResourceManagerCxnAsync() {
        this.validateRunsInMainThread();
        CompletableFuture previousCxn = this.currentResourceManagerCxn != null ? Services.stopAsync((Service)this.currentResourceManagerCxn, (Executor)this.getIOExecutor()) : CompletableFuture.completedFuture(null);
        this.currentResourceManagerCxn = null;
        return ((CompletableFuture)previousCxn.exceptionally(throwable -> {
            log.error("Closing the previous connection failed; Ignoring the error", throwable);
            return null;
        })).thenComposeAsync(dontCare -> {
            if (this.currentResourceManagerCxn == null) {
                return this.establishNewResourceManagerCxnAsync();
            }
            return CompletableFuture.completedFuture(null);
        }, (Executor)this.getMainThreadExecutor());
    }

    private void setResourceManagerCxn(ResourceManagerGatewayCxn cxn) {
        this.validateRunsInMainThread();
        Preconditions.checkArgument((this.currentResourceManagerCxn == null ? 1 : 0) != 0, (Object)"existing connection already set");
        cxn.addListener(new Service.Listener(){

            public void failed(Service.State from, Throwable failure) {
                if (from.ordinal() == Service.State.RUNNING.ordinal()) {
                    log.error("Connection with the resource manager failed; Retrying", failure);
                    TaskExecutor.this.clearResourceManagerCxn();
                    TaskExecutor.this.scheduleRunAsync(() -> TaskExecutor.this.establishNewResourceManagerCxnAsync(), TaskExecutor.this.workerConfiguration.getHeartbeatInterval());
                }
            }
        }, (Executor)this.getMainThreadExecutor());
        this.currentResourceManagerCxn = cxn;
    }

    private void clearResourceManagerCxn() {
        this.validateRunsInMainThread();
        this.currentResourceManagerCxn = null;
    }

    private ResourceManagerGatewayCxn newResourceManagerCxn() {
        this.validateRunsInMainThread();
        ResourceClusterGateway resourceManagerGateway = (ResourceClusterGateway)this.resourceClusterGatewaySupplier.getCurrent();
        return new ResourceManagerGatewayCxn(this.resourceManagerCxnIdx++, this.taskExecutorRegistration, resourceManagerGateway, this.workerConfiguration.getHeartbeatInterval(), this.workerConfiguration.getHeartbeatTimeout(), this::getCurrentReport, this.workerConfiguration.getTolerableConsecutiveHeartbeatFailures());
    }

    private ExecutorService getIOExecutor() {
        return this.ioExecutor;
    }

    private ExecutorService getRuntimeExecutor() {
        return this.runtimeTaskExecutor;
    }

    private CompletableFuture<TaskExecutorReport> getCurrentReport(Time timeout) {
        return this.callAsync(() -> {
            if (this.currentTask == null) {
                return TaskExecutorReport.available();
            }
            return TaskExecutorReport.occupied((WorkerId)WorkerId.fromIdUnsafe((String)this.currentTask.getWorkerId()));
        }, timeout);
    }

    @VisibleForTesting
    <T> CompletableFuture<T> callInMainThread(Callable<CompletableFuture<T>> tSupplier, Time timeout) {
        return this.callAsync(tSupplier, timeout).thenCompose(t -> t);
    }

    public CompletableFuture<Ack> submitTask(ExecuteStageRequest request) {
        log.info("Received request {} for execution", (Object)request);
        if (this.currentTask != null) {
            if (this.currentTask.getWorkerId().equals(request.getWorkerId().getId())) {
                return CompletableFuture.completedFuture(Ack.getInstance());
            }
            return CompletableFutures.exceptionallyCompletedFuture((Throwable)new TaskExecutorGateway.TaskAlreadyRunningException(WorkerId.fromIdUnsafe((String)this.currentTask.getWorkerId())));
        }
        WrappedExecuteStageRequest wrappedRequest = new WrappedExecuteStageRequest(PublishSubject.create(), request);
        this.getIOExecutor().execute(() -> this.prepareTask(wrappedRequest));
        return CompletableFuture.completedFuture(Ack.getInstance());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareTask(WrappedExecuteStageRequest wrappedRequest) {
        try {
            UserCodeClassLoader userCodeClassLoader = this.taskFactory.getUserCodeClassLoader(wrappedRequest.getRequest(), this.classLoaderHandle);
            ClassLoader cl = userCodeClassLoader.asClassLoader();
            JsonSerializer ser = new JsonSerializer();
            String executeRequest = ser.toJson((Object)wrappedRequest.getRequest());
            String configString = ser.toJson((Object)WorkerConfigurationUtils.toWritable((WorkerConfiguration)this.workerConfiguration));
            RuntimeTask task = this.taskFactory.getRuntimeTaskInstance(wrappedRequest.getRequest(), cl);
            task.initialize(executeRequest, configString, userCodeClassLoader);
            this.scheduleRunAsync(() -> {
                this.setCurrentTask(task);
                this.startCurrentTask();
            }, 0L, TimeUnit.MILLISECONDS);
        }
        catch (Exception ex) {
            log.error("Failed to submit task, request: {}", (Object)wrappedRequest.getRequest(), (Object)ex);
            this.listeners.enqueue(TaskExecutor.getTaskFailedEvent(null, ex));
        }
        finally {
            this.getIOExecutor().execute(() -> this.listeners.dispatch());
        }
    }

    private void startCurrentTask() {
        this.validateRunsInMainThread();
        if (this.currentTask.state().equals((Object)Service.State.NEW)) {
            this.listeners.enqueue(TaskExecutor.getTaskStartingEvent(this.currentTask));
            this.getIOExecutor().execute(() -> this.listeners.dispatch());
            CompletableFuture currentTaskSuccessfullyStartFuture = Services.startAsync((Service)this.currentTask, (Executor)this.getRuntimeExecutor());
            currentTaskSuccessfullyStartFuture.whenCompleteAsync((dontCare, throwable) -> {
                if (throwable != null) {
                    log.error("TaskExecutor failed to start: {}", throwable);
                    RuntimeTask task = this.currentTask;
                    this.setCurrentTask(null);
                    this.setPreviousFailure((Throwable)throwable);
                    this.listeners.enqueue(TaskExecutor.getTaskFailedEvent(task, throwable));
                    this.getIOExecutor().execute(() -> this.listeners.dispatch());
                }
            }, (Executor)this.getMainThreadExecutor());
        }
    }

    private void setCurrentTask(@Nullable RuntimeTask task) {
        this.validateRunsInMainThread();
        this.currentTask = task;
        if (task == null) {
            this.setStatus((TaskExecutorReport)TaskExecutorReport.available());
        } else {
            this.setStatus((TaskExecutorReport)TaskExecutorReport.occupied((WorkerId)WorkerId.fromIdUnsafe((String)task.getWorkerId())));
        }
    }

    private void setPreviousFailure(Throwable throwable) {
        this.validateRunsInMainThread();
        this.previousFailure = throwable;
    }

    private void setStatus(TaskExecutorReport newReport) {
        this.validateRunsInMainThread();
        this.currentReport = newReport;
        try {
            Preconditions.checkState((this.currentResourceManagerCxn != null ? 1 : 0) != 0, (Object)"currentResourceManagerCxn was not expected to be null");
            this.currentResourceManagerCxn.gateway.notifyTaskExecutorStatusChange(new TaskExecutorStatusChange(this.taskExecutorID, this.clusterID, newReport)).whenCompleteAsync((ack, throwable) -> {
                if (throwable != null) {
                    log.warn("Failed to update the status {}", (Object)newReport, throwable);
                }
            }, (Executor)this.getIOExecutor());
        }
        catch (Exception e) {
            log.warn("Failed to update the status {}", (Object)newReport, (Object)e);
        }
    }

    public CompletableFuture<Ack> cancelTask(WorkerId workerId) {
        log.info("TaskExecutor cancelTask requested for {}", (Object)workerId);
        if (this.currentTask == null) {
            return CompletableFutures.exceptionallyCompletedFuture((Throwable)new TaskExecutorGateway.TaskNotFoundException(workerId));
        }
        if (!this.currentTask.getWorkerId().equals(workerId.getId())) {
            log.error("my current worker id is {} while expected worker id is {}", (Object)this.currentTask.getWorkerId(), (Object)workerId);
            return CompletableFutures.exceptionallyCompletedFuture((Throwable)new TaskExecutorGateway.TaskNotFoundException(workerId));
        }
        this.scheduleRunAsync(this::stopCurrentTask, 0L, TimeUnit.MILLISECONDS);
        return CompletableFuture.completedFuture(Ack.getInstance());
    }

    private CompletableFuture<Void> stopCurrentTask() {
        log.info("TaskExecutor stopCurrentTask.");
        this.validateRunsInMainThread();
        if (this.currentTask != null) {
            try {
                if (this.currentTask.state().ordinal() <= Service.State.RUNNING.ordinal()) {
                    this.listeners.enqueue(TaskExecutor.getTaskCancellingEvent(this.currentTask));
                    CompletableFuture stopTaskFuture = Services.stopAsync((Service)this.currentTask, (Executor)this.getRuntimeExecutor());
                    CompletionStage completionStage = stopTaskFuture.whenCompleteAsync((dontCare, throwable) -> {
                        RuntimeTask t = this.currentTask;
                        this.setCurrentTask(null);
                        if (throwable != null) {
                            this.setPreviousFailure((Throwable)throwable);
                        }
                        this.listeners.enqueue(TaskExecutor.getTaskCancelledEvent(t, throwable));
                        this.getIOExecutor().execute(() -> this.listeners.dispatch());
                    }, (Executor)this.getMainThreadExecutor());
                    return completionStage;
                }
                CompletableFuture<Object> stopTaskFuture = CompletableFuture.completedFuture(null);
                return stopTaskFuture;
            }
            catch (Exception e) {
                log.error("stopping current task failed", (Throwable)e);
                CompletableFuture completableFuture = CompletableFutures.exceptionallyCompletedFuture((Throwable)e);
                return completableFuture;
            }
            finally {
                this.getIOExecutor().execute(() -> this.listeners.dispatch());
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> stopResourceManager() {
        this.validateRunsInMainThread();
        CompletableFuture currentResourceManagerCxnCompletionFuture = this.currentResourceManagerCxn != null ? Services.stopAsync((Service)this.currentResourceManagerCxn, (Executor)this.getIOExecutor()) : CompletableFuture.completedFuture(null);
        return currentResourceManagerCxnCompletionFuture;
    }

    public CompletableFuture<String> requestThreadDump() {
        return CompletableFuture.completedFuture(JvmUtils.createThreadDumpAsString());
    }

    CompletableFuture<Boolean> isRegistered(Time timeout) {
        return this.callAsync(() -> this.currentResourceManagerCxn != null, timeout);
    }

    protected void updateExecutionStatus(Status status) {
        this.taskStatusUpdateHandler.onStatusUpdate(status);
    }

    protected CompletableFuture<Void> onStop() {
        this.validateRunsInMainThread();
        log.info("TaskExecutor onStop.");
        CompletableFuture<Void> runningTaskCompletionFuture = this.stopCurrentTask();
        return ((CompletableFuture)((CompletableFuture)runningTaskCompletionFuture.handleAsync((dontCare, throwable) -> {
            if (throwable != null) {
                log.error("Failed to stop the task successfully", throwable);
            }
            return this.stopResourceManager();
        }, (Executor)this.getMainThreadExecutor())).thenCompose(Function.identity())).whenCompleteAsync((dontCare, throwable) -> {
            try {
                this.classLoaderHandle.close();
            }
            catch (Exception e) {
                log.error("Failed to close classloader handle correctly", (Throwable)e);
            }
        }, (Executor)this.getIOExecutor());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void addListener(Listener listener, Executor executor) {
        ListenerCallQueue<Listener> listenerCallQueue = this.listeners;
        synchronized (listenerCallQueue) {
            this.listeners.addListener((Object)listener, executor);
        }
    }

    private static ListenerCallQueue.Event<Listener> getTaskStartingEvent(final RuntimeTask task) {
        return new ListenerCallQueue.Event<Listener>(){

            public void call(Listener listener) {
                listener.onTaskStarting(task);
            }

            public String toString() {
                return "starting()";
            }
        };
    }

    private static ListenerCallQueue.Event<Listener> getTaskCancellingEvent(final RuntimeTask task) {
        return new ListenerCallQueue.Event<Listener>(){

            public void call(Listener listener) {
                listener.onTaskCancelling(task);
            }

            public String toString() {
                return "cancelling()";
            }
        };
    }

    private static ListenerCallQueue.Event<Listener> getTaskFailedEvent(final RuntimeTask task, final Throwable throwable) {
        return new ListenerCallQueue.Event<Listener>(){

            public void call(Listener listener) {
                listener.onTaskFailed(task, throwable);
            }

            public String toString() {
                return "failed()";
            }
        };
    }

    private static ListenerCallQueue.Event<Listener> getTaskCancelledEvent(final RuntimeTask task, final @Nullable Throwable throwable) {
        return new ListenerCallQueue.Event<Listener>(){

            public void call(Listener listener) {
                listener.onTaskCancelled(task, throwable);
            }

            public String toString() {
                return "cancelled()";
            }
        };
    }

    public TaskExecutorID getTaskExecutorID() {
        return this.taskExecutorID;
    }

    public ClusterID getClusterID() {
        return this.clusterID;
    }

    public static interface Listener {
        public void onTaskStarting(RuntimeTask var1);

        public void onTaskFailed(RuntimeTask var1, Throwable var2);

        public void onTaskCancelling(RuntimeTask var1);

        public void onTaskCancelled(RuntimeTask var1, @Nullable Throwable var2);

        public static Listener noop() {
            return new Listener(){

                @Override
                public void onTaskStarting(RuntimeTask task) {
                }

                @Override
                public void onTaskFailed(RuntimeTask task, Throwable throwable) {
                }

                @Override
                public void onTaskCancelling(RuntimeTask task) {
                }

                @Override
                public void onTaskCancelled(RuntimeTask task, @Nullable Throwable throwable) {
                }
            };
        }
    }

    private class ResourceManagerChangeListener
    implements ResourceLeaderConnection.ResourceLeaderChangeListener<ResourceClusterGateway> {
        private ResourceManagerChangeListener() {
        }

        public void onResourceLeaderChanged(ResourceClusterGateway previousResourceLeader, ResourceClusterGateway newResourceLeader) {
            TaskExecutor.this.runAsync(() -> TaskExecutor.this.reestablishResourceManagerCxnAsync());
        }
    }

    static class ResourceManagerGatewayCxn
    extends AbstractScheduledService {
        private static final Logger log = LoggerFactory.getLogger(ResourceManagerGatewayCxn.class);
        private final int idx;
        private final TaskExecutorRegistration taskExecutorRegistration;
        private final ResourceClusterGateway gateway;
        private final Time heartBeatInterval;
        private final Time heartBeatTimeout;
        private final Time timeout = Time.of((long)1000L, (TimeUnit)TimeUnit.MILLISECONDS);
        private final Function<Time, CompletableFuture<TaskExecutorReport>> currentReportSupplier;
        private final int tolerableConsecutiveHeartbeatFailures;
        private int numFailedHeartbeats = 0;

        protected String serviceName() {
            return "ResourceManagerGatewayCxn-" + this.idx;
        }

        protected AbstractScheduledService.Scheduler scheduler() {
            return AbstractScheduledService.Scheduler.newFixedDelaySchedule((long)0L, (long)this.heartBeatInterval.getSize(), (TimeUnit)this.heartBeatInterval.getUnit());
        }

        public void startUp() throws Exception {
            log.info("Trying to register with resource manager {}", (Object)this.gateway);
            try {
                this.gateway.registerTaskExecutor(this.taskExecutorRegistration).get(this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
            }
            catch (Exception e) {
                log.error("Registration to gateway {} has failed; Disconnecting now to be safe", (Object)this.gateway, (Object)e);
                try {
                    this.gateway.disconnectTaskExecutor(new TaskExecutorDisconnection(this.taskExecutorRegistration.getTaskExecutorID(), this.taskExecutorRegistration.getClusterID())).get(2L * this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
                }
                catch (Exception inner) {
                    log.error("Disconnection has also failed", (Throwable)inner);
                }
                throw e;
            }
        }

        public void runOneIteration() throws Exception {
            try {
                ((CompletableFuture)this.currentReportSupplier.apply(this.timeout).thenComposeAsync(report -> {
                    log.info("Sending heartbeat to resource manager {} with report {}", (Object)this.gateway, report);
                    return this.gateway.heartBeatFromTaskExecutor(new TaskExecutorHeartbeat(this.taskExecutorRegistration.getTaskExecutorID(), this.taskExecutorRegistration.getClusterID(), report));
                })).get(this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
                this.numFailedHeartbeats = 0;
            }
            catch (Exception e) {
                log.error("Failed to send heartbeat to gateway {}", (Object)this.gateway, (Object)e);
                ++this.numFailedHeartbeats;
                if (this.numFailedHeartbeats > this.tolerableConsecutiveHeartbeatFailures) {
                    throw e;
                }
                log.info("Ignoring heartbeat failure to gateway {} due to failed heartbeats {} <= {}", new Object[]{this.gateway, this.numFailedHeartbeats, this.tolerableConsecutiveHeartbeatFailures});
            }
        }

        public void shutDown() throws Exception {
            this.gateway.disconnectTaskExecutor(new TaskExecutorDisconnection(this.taskExecutorRegistration.getTaskExecutorID(), this.taskExecutorRegistration.getClusterID())).get(this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
        }

        public String toString() {
            return "TaskExecutor.ResourceManagerGatewayCxn(gateway=" + this.gateway + ")";
        }

        @ConstructorProperties(value={"idx", "taskExecutorRegistration", "gateway", "heartBeatInterval", "heartBeatTimeout", "currentReportSupplier", "tolerableConsecutiveHeartbeatFailures"})
        public ResourceManagerGatewayCxn(int idx, TaskExecutorRegistration taskExecutorRegistration, ResourceClusterGateway gateway, Time heartBeatInterval, Time heartBeatTimeout, Function<Time, CompletableFuture<TaskExecutorReport>> currentReportSupplier, int tolerableConsecutiveHeartbeatFailures) {
            this.idx = idx;
            this.taskExecutorRegistration = taskExecutorRegistration;
            this.gateway = gateway;
            this.heartBeatInterval = heartBeatInterval;
            this.heartBeatTimeout = heartBeatTimeout;
            this.currentReportSupplier = currentReportSupplier;
            this.tolerableConsecutiveHeartbeatFailures = tolerableConsecutiveHeartbeatFailures;
        }
    }
}

