package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionInfo;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.ResourceManagerAddress;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskBackPressureResponse;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcKvStateRegistryListener;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor.class */
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final HighAvailabilityServices haServices;
    private final TaskManagerServices taskExecutorServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final FatalErrorHandler fatalErrorHandler;
    private final BlobCacheService blobCacheService;

    @Nullable
    private final String metricQueryServiceAddress;
    private final TaskManagerLocation taskManagerLocation;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final TaskExecutorLocalStateStoresManager localStateStoresManager;
    private final ShuffleEnvironment<?, ?> shuffleEnvironment;
    private final KvStateService kvStateService;
    private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
    private final TaskSlotTable<Task> taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    private final HardwareDescription hardwareDescription;
    private FileCache fileCache;
    private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
    private final TaskExecutorPartitionTracker partitionTracker;
    private final BackPressureSampleService backPressureSampleService;

    @Nullable
    private ResourceManagerAddress resourceManagerAddress;

    @Nullable
    private EstablishedResourceManagerConnection establishedResourceManagerConnection;

    @Nullable
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;

    @Nullable
    private UUID currentRegistrationTimeoutId;
    private Map<JobID, Collection<CompletableFuture<ExecutionState>>> taskResultPartitionCleanupFuturesPerJob;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobLeaderListenerImpl.class */
    public final class JobLeaderListenerImpl implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerGainedLeadership(JobID jobID, JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess jMTMRegistrationSuccess) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.establishJobManagerConnection(jobID, jobMasterGateway, jMTMRegistrationSuccess);
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerLostLeadership(JobID jobID, JobMasterId jobMasterId) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", jobID, jobMasterId);
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.closeJobManagerConnection(jobID, new Exception("Job leader for job id " + jobID + " lost leadership."));
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void handleError(Throwable th) {
            TaskExecutor.this.onFatalError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobManagerHeartbeatListener.class */
    public class JobManagerHeartbeatListener implements HeartbeatListener<AllocatedSlotReport, AccumulatorReport> {
        private JobManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            JobManagerConnection jobManagerConnection;
            TaskExecutor.this.validateRunsInMainThread();
            TaskExecutor.this.log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
            if (!TaskExecutor.this.jobManagerConnections.containsKey(resourceID) || (jobManagerConnection = (JobManagerConnection) TaskExecutor.this.jobManagerConnections.get(resourceID)) == null) {
                return;
            }
            TaskExecutor.this.closeJobManagerConnection(jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
            TaskExecutor.this.jobLeaderService.reconnect(jobManagerConnection.getJobID());
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
            TaskExecutor.this.validateRunsInMainThread();
            TaskExecutor.this.syncSlotsWithSnapshotFromJobMaster(allocatedSlotReport);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public AccumulatorReport retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            JobManagerConnection jobManagerConnection = (JobManagerConnection) TaskExecutor.this.jobManagerConnections.get(resourceID);
            if (jobManagerConnection == null) {
                return new AccumulatorReport(Collections.emptyList());
            }
            JobID jobID = jobManagerConnection.getJobID();
            ArrayList arrayList = new ArrayList(16);
            Iterator tasks = TaskExecutor.this.taskSlotTable.getTasks(jobID);
            while (tasks.hasNext()) {
                arrayList.add(((Task) tasks.next()).getAccumulatorRegistry().getSnapshot());
            }
            return new AccumulatorReport(arrayList);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerHeartbeatListener.class */
    public class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            if (TaskExecutor.this.establishedResourceManagerConnection == null || !TaskExecutor.this.establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceID)) {
                TaskExecutor.this.log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceID);
            } else {
                TaskExecutor.this.log.info("The heartbeat of ResourceManager with id {} timed out.", resourceID);
                TaskExecutor.this.reconnectToResourceManager(new TaskManagerException(String.format("The heartbeat of ResourceManager with id %s timed out.", resourceID)));
            }
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public TaskExecutorHeartbeatPayload retrievePayload(ResourceID resourceID) {
            TaskExecutor.this.validateRunsInMainThread();
            return new TaskExecutorHeartbeatPayload(TaskExecutor.this.taskSlotTable.createSlotReport(TaskExecutor.this.getResourceID()), TaskExecutor.this.partitionTracker.createClusterPartitionReport());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerLeaderListener.class */
    public final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(String str, UUID uuid) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.notifyOfNewResourceManagerLeader(str, ResourceManagerId.fromUuidOrNull(uuid));
            });
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            TaskExecutor.this.onFatalError(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerRegistrationListener.class */
    public final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> {
        private ResourceManagerRegistrationListener() {
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection taskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess) {
            ResourceID resourceManagerId = taskExecutorRegistrationSuccess.getResourceManagerId();
            InstanceID registrationId = taskExecutorRegistrationSuccess.getRegistrationId();
            ClusterInformation clusterInformation = taskExecutorRegistrationSuccess.getClusterInformation();
            ResourceManagerGateway targetGateway = taskExecutorToResourceManagerConnection.getTargetGateway();
            TaskExecutor.this.runAsync(() -> {
                if (TaskExecutor.this.resourceManagerConnection == taskExecutorToResourceManagerConnection) {
                    try {
                        TaskExecutor.this.establishResourceManagerConnection(targetGateway, resourceManagerId, registrationId, clusterInformation);
                    } catch (Throwable th) {
                        TaskExecutor.this.log.error("Establishing Resource Manager connection in Task Executor failed", th);
                    }
                }
            });
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationFailure(Throwable th) {
            TaskExecutor.this.onFatalError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$SlotActionsImpl.class */
    public class SlotActionsImpl implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override // org.apache.flink.runtime.taskexecutor.slot.SlotActions
        public void freeSlot(AllocationID allocationID) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.freeSlotInternal(allocationID, new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationID + '.'));
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.slot.SlotActions
        public void timeoutSlot(AllocationID allocationID, UUID uuid) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.timeoutSlot(allocationID, uuid);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$TaskManagerActionsImpl.class */
    public final class TaskManagerActionsImpl implements TaskManagerActions {
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = (JobMasterGateway) Preconditions.checkNotNull(jobMasterGateway);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void notifyFatalError(String str, Throwable th) {
            try {
                TaskExecutor.this.log.error(str, th);
            } catch (Throwable th2) {
            }
            TaskExecutor.this.fatalErrorHandler.onFatalError(th);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void failTask(ExecutionAttemptID executionAttemptID, Throwable th) {
            TaskExecutor.this.runAsync(() -> {
                TaskExecutor.this.failTask(executionAttemptID, th);
            });
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                TaskExecutor.this.runAsync(() -> {
                    TaskExecutor.this.unregisterTaskAndNotifyFinalState(this.jobMasterGateway, taskExecutionState.getID());
                });
            } else {
                TaskExecutor.this.updateTaskExecutionState(this.jobMasterGateway, taskExecutionState);
            }
        }
    }

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, HighAvailabilityServices highAvailabilityServices, TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices, TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String str, BlobCacheService blobCacheService, FatalErrorHandler fatalErrorHandler, TaskExecutorPartitionTracker taskExecutorPartitionTracker, BackPressureSampleService backPressureSampleService) {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        this.taskResultPartitionCleanupFuturesPerJob = new HashMap(8);
        Preconditions.checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration) Preconditions.checkNotNull(taskManagerConfiguration);
        this.taskExecutorServices = (TaskManagerServices) Preconditions.checkNotNull(taskManagerServices);
        this.haServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.partitionTracker = taskExecutorPartitionTracker;
        this.taskManagerMetricGroup = (TaskManagerMetricGroup) Preconditions.checkNotNull(taskManagerMetricGroup);
        this.blobCacheService = (BlobCacheService) Preconditions.checkNotNull(blobCacheService);
        this.metricQueryServiceAddress = str;
        this.backPressureSampleService = (BackPressureSampleService) Preconditions.checkNotNull(backPressureSampleService);
        this.taskSlotTable = taskManagerServices.getTaskSlotTable();
        this.jobManagerTable = taskManagerServices.getJobManagerTable();
        this.jobLeaderService = taskManagerServices.getJobLeaderService();
        this.taskManagerLocation = taskManagerServices.getTaskManagerLocation();
        this.localStateStoresManager = taskManagerServices.getTaskManagerStateStore();
        this.shuffleEnvironment = taskManagerServices.getShuffleEnvironment();
        this.kvStateService = taskManagerServices.getKvStateService();
        this.resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
        this.jobManagerConnections = new HashMap(4);
        this.hardwareDescription = HardwareDescription.extractFromSystem(taskManagerServices.getManagedMemorySize());
        this.resourceManagerAddress = null;
        this.resourceManagerConnection = null;
        this.currentRegistrationTimeoutId = null;
        ResourceID resourceID = taskManagerServices.getTaskManagerLocation().getResourceID();
        this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceID);
        this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceID);
    }

    private HeartbeatManager<Void, TaskExecutorHeartbeatPayload> createResourceManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceID) {
        return heartbeatServices.createHeartbeatManager(resourceID, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), this.log);
    }

    private HeartbeatManager<AllocatedSlotReport, AccumulatorReport> createJobManagerHeartbeatManager(HeartbeatServices heartbeatServices, ResourceID resourceID) {
        return heartbeatServices.createHeartbeatManager(resourceID, new JobManagerHeartbeatListener(), getMainThreadExecutor(), this.log);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Boolean> canBeReleased() {
        return CompletableFuture.completedFuture(Boolean.valueOf(this.shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty()));
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void onStart() throws Exception {
        try {
            startTaskExecutorServices();
            startRegistrationTimeout();
        } catch (Exception e) {
            TaskManagerException taskManagerException = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
            onFatalError(taskManagerException);
            throw taskManagerException;
        }
    }

    private void startTaskExecutorServices() throws Exception {
        try {
            this.resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            this.taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
            this.jobLeaderService.start(getAddress(), getRpcService(), this.haServices, new JobLeaderListenerImpl());
            this.fileCache = new FileCache(this.taskManagerConfiguration.getTmpDirectories(), this.blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }

    private void handleStartTaskExecutorServicesException(Exception exc) throws Exception {
        try {
            stopTaskExecutorServices();
        } catch (Exception e) {
            exc.addSuppressed(e);
        }
        throw exc;
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping TaskExecutor {}.", getAddress());
        Throwable th = null;
        FlinkException flinkException = new FlinkException("The TaskExecutor is shutting down.");
        closeResourceManagerConnection(flinkException);
        Iterator<JobManagerConnection> it2 = this.jobManagerConnections.values().iterator();
        while (it2.hasNext()) {
            try {
                disassociateFromJobManager(it2.next(), flinkException);
            } catch (Throwable th2) {
                th = ExceptionUtils.firstOrSuppressed(th2, th);
            }
        }
        Throwable th3 = th;
        return FutureUtils.runAfterwards(this.taskSlotTable.closeAsync(), this::stopTaskExecutorServices).handle((r6, th4) -> {
            handleOnStopException(th3, th4);
            return null;
        });
    }

    private void handleOnStopException(Throwable th, Throwable th2) {
        Throwable firstOrSuppressed = th != null ? ExceptionUtils.firstOrSuppressed(th, th2) : th2;
        if (firstOrSuppressed != null) {
            throw new CompletionException(new FlinkException("Error while shutting the TaskExecutor down.", firstOrSuppressed));
        }
        this.log.info("Stopped TaskExecutor {}.", getAddress());
    }

    private void stopTaskExecutorServices() throws Exception {
        Exception exc = null;
        try {
            this.jobLeaderService.stop();
        } catch (Exception e) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e, null);
        }
        try {
            this.resourceManagerLeaderRetriever.stop();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        try {
            this.taskExecutorServices.shutDown();
        } catch (Exception e3) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
        }
        try {
            this.fileCache.shutdown();
        } catch (Exception e4) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
        }
        this.taskManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException(exc);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<TaskBackPressureResponse> requestTaskBackPressure(ExecutionAttemptID executionAttemptID, int i, @RpcTimeout Time time) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        return task == null ? FutureUtils.completedExceptionally(new IllegalStateException(String.format("Cannot request back pressure of task %s. Task is not known to the task manager.", executionAttemptID))) : this.backPressureSampleService.sampleTaskBackPressure(task).thenApply(d -> {
            return new TaskBackPressureResponse(i, executionAttemptID, d.doubleValue());
        });
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, JobMasterId jobMasterId, Time time) {
        try {
            JobID jobId = taskDeploymentDescriptor.getJobId();
            JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
            if (jobManagerConnection == null) {
                String str = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
                this.log.debug(str);
                throw new TaskSubmissionException(str);
            }
            if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
                String str2 = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + jobManagerConnection.getJobMasterId() + '.';
                this.log.debug(str2);
                throw new TaskSubmissionException(str2);
            }
            if (!this.taskSlotTable.tryMarkSlotActive(jobId, taskDeploymentDescriptor.getAllocationId())) {
                String str3 = "No task slot allocated for job ID " + jobId + " and allocation ID " + taskDeploymentDescriptor.getAllocationId() + '.';
                this.log.debug(str3);
                throw new TaskSubmissionException(str3);
            }
            try {
                taskDeploymentDescriptor.loadBigData(this.blobCacheService.getPermanentBlobService());
                try {
                    JobInformation deserializeValue = taskDeploymentDescriptor.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
                    TaskInformation deserializeValue2 = taskDeploymentDescriptor.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
                    if (!jobId.equals(deserializeValue.getJobId())) {
                        throw new TaskSubmissionException("Inconsistent job ID information inside TaskDeploymentDescriptor (" + taskDeploymentDescriptor.getJobId() + " vs. " + deserializeValue.getJobId() + ")");
                    }
                    TaskMetricGroup addTaskForJob = this.taskManagerMetricGroup.addTaskForJob(deserializeValue.getJobId(), deserializeValue.getJobName(), deserializeValue2.getJobVertexId(), taskDeploymentDescriptor.getExecutionAttemptId(), deserializeValue2.getTaskName(), taskDeploymentDescriptor.getSubtaskIndex(), taskDeploymentDescriptor.getAttemptNumber());
                    RpcInputSplitProvider rpcInputSplitProvider = new RpcInputSplitProvider(jobManagerConnection.getJobManagerGateway(), deserializeValue2.getJobVertexId(), taskDeploymentDescriptor.getExecutionAttemptId(), this.taskManagerConfiguration.getTimeout());
                    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
                    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
                    GlobalAggregateManager globalAggregateManager = jobManagerConnection.getGlobalAggregateManager();
                    LibraryCacheManager libraryCacheManager = jobManagerConnection.getLibraryCacheManager();
                    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
                    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
                    try {
                        Task task = new Task(deserializeValue, deserializeValue2, taskDeploymentDescriptor.getExecutionAttemptId(), taskDeploymentDescriptor.getAllocationId(), taskDeploymentDescriptor.getSubtaskIndex(), taskDeploymentDescriptor.getAttemptNumber(), taskDeploymentDescriptor.getProducedPartitions(), taskDeploymentDescriptor.getInputGates(), taskDeploymentDescriptor.getTargetSlotNumber(), this.taskSlotTable.getTaskMemoryManager(taskDeploymentDescriptor.getAllocationId()), this.taskExecutorServices.getIOManager(), this.taskExecutorServices.getShuffleEnvironment(), this.taskExecutorServices.getKvStateService(), this.taskExecutorServices.getBroadcastVariableManager(), this.taskExecutorServices.getTaskEventDispatcher(), new TaskStateManagerImpl(jobId, taskDeploymentDescriptor.getExecutionAttemptId(), this.localStateStoresManager.localStateStoreForSubtask(jobId, taskDeploymentDescriptor.getAllocationId(), deserializeValue2.getJobVertexId(), taskDeploymentDescriptor.getSubtaskIndex()), taskDeploymentDescriptor.getTaskRestore(), checkpointResponder), taskManagerActions, rpcInputSplitProvider, checkpointResponder, globalAggregateManager, this.blobCacheService, libraryCacheManager, this.fileCache, this.taskManagerConfiguration, addTaskForJob, resultPartitionConsumableNotifier, partitionStateChecker, getRpcService().getExecutor());
                        task.getClass();
                        addTaskForJob.gauge(MetricNames.IS_BACKPRESSURED, (String) task::isBackPressured);
                        this.log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
                        try {
                            if (this.taskSlotTable.addTask(task)) {
                                task.startTaskThread();
                                setupResultPartitionBookkeeping(taskDeploymentDescriptor.getJobId(), taskDeploymentDescriptor.getProducedPartitions(), task.getTerminationFuture());
                                return CompletableFuture.completedFuture(Acknowledge.get());
                            }
                            String str4 = "TaskManager already contains a task for id " + task.getExecutionId() + '.';
                            this.log.debug(str4);
                            throw new TaskSubmissionException(str4);
                        } catch (SlotNotActiveException | SlotNotFoundException e) {
                            throw new TaskSubmissionException("Could not submit task.", e);
                        }
                    } catch (SlotNotFoundException e2) {
                        throw new TaskSubmissionException("Could not submit task.", e2);
                    }
                } catch (IOException | ClassNotFoundException e3) {
                    throw new TaskSubmissionException("Could not deserialize the job or task information.", e3);
                }
            } catch (IOException | ClassNotFoundException e4) {
                throw new TaskSubmissionException("Could not re-integrate offloaded TaskDeploymentDescriptor data.", e4);
            }
        } catch (TaskSubmissionException e5) {
            return FutureUtils.completedExceptionally(e5);
        }
    }

    private void setupResultPartitionBookkeeping(JobID jobID, Collection<ResultPartitionDeploymentDescriptor> collection, CompletableFuture<ExecutionState> completableFuture) {
        Set set = (Set) filterPartitionsRequiringRelease(collection).peek(resultPartitionDeploymentDescriptor -> {
            this.partitionTracker.startTrackingPartition(jobID, TaskExecutorPartitionInfo.from(resultPartitionDeploymentDescriptor));
        }).map((v0) -> {
            return v0.getShuffleDescriptor();
        }).map((v0) -> {
            return v0.getResultPartitionID();
        }).collect(Collectors.toSet());
        CompletableFuture<U> thenApplyAsync = completableFuture.thenApplyAsync(executionState -> {
            if (executionState != ExecutionState.FINISHED) {
                this.partitionTracker.stopTrackingPartitions(set);
            }
            return executionState;
        }, (Executor) getMainThreadExecutor());
        this.taskResultPartitionCleanupFuturesPerJob.compute(jobID, (jobID2, collection2) -> {
            if (collection2 == null) {
                collection2 = new ArrayList(4);
            }
            collection2.add(thenApplyAsync);
            return collection2;
        });
    }

    private Stream<ResultPartitionDeploymentDescriptor> filterPartitionsRequiringRelease(Collection<ResultPartitionDeploymentDescriptor> collection) {
        return collection.stream().filter(resultPartitionDeploymentDescriptor -> {
            return resultPartitionDeploymentDescriptor.getPartitionType().isBlocking();
        }).filter(resultPartitionDeploymentDescriptor2 -> {
            return resultPartitionDeploymentDescriptor2.getShuffleDescriptor().storesLocalResourcesOn().isPresent();
        });
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time time) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            String str = "Cannot find task to stop for execution " + executionAttemptID + '.';
            this.log.debug(str);
            return FutureUtils.completedExceptionally(new TaskException(str));
        }
        try {
            task.cancelExecution();
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Throwable th) {
            return FutureUtils.completedExceptionally(new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', th));
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> iterable, Time time) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        for (PartitionInfo partitionInfo : iterable) {
            FutureUtils.assertNoException(CompletableFuture.runAsync(() -> {
                try {
                    if (!this.shuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) {
                        this.log.debug("Discard update for input gate partition {} of result {} in task {}. The partition is no longer available.", partitionInfo.getShuffleDescriptor().getResultPartitionID(), partitionInfo.getIntermediateDataSetID(), executionAttemptID);
                    }
                } catch (IOException | InterruptedException e) {
                    this.log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
                    task.failExternally(e);
                }
            }, getRpcService().getExecutor()));
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void releaseOrPromotePartitions(JobID jobID, Set<ResultPartitionID> set, Set<ResultPartitionID> set2) {
        try {
            this.partitionTracker.stopTrackingAndReleaseJobPartitions(set);
            this.partitionTracker.promoteJobPartitions(set2);
            closeJobManagerConnectionIfNoAllocatedResources(jobID);
        } catch (Throwable th) {
            onFatalError(th);
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void heartbeatFromJobManager(ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
        this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2, CheckpointOptions checkpointOptions, boolean z) {
        this.log.debug("Trigger checkpoint {}@{} for {}.", Long.valueOf(j), Long.valueOf(j2), executionAttemptID);
        CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (z && (!checkpointType.isSynchronous() || !checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(j, j2, checkpointOptions, z);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String str = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        return FutureUtils.completedExceptionally(new CheckpointException(str, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2) {
        this.log.debug("Confirm checkpoint {}@{} for {}.", Long.valueOf(j), Long.valueOf(j2), executionAttemptID);
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(j);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
        String str = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        return FutureUtils.completedExceptionally(new CheckpointException(str, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> requestSlot(SlotID slotID, JobID jobID, AllocationID allocationID, ResourceProfile resourceProfile, String str, ResourceManagerId resourceManagerId, Time time) {
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationID, jobID, resourceManagerId);
        try {
            if (!isConnectedToResourceManager(resourceManagerId)) {
                String format = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
                this.log.debug(format);
                throw new TaskManagerException(format);
            }
            if (this.taskSlotTable.isSlotFree(slotID.getSlotNumber())) {
                if (!this.taskSlotTable.allocateSlot(slotID.getSlotNumber(), jobID, allocationID, resourceProfile, this.taskManagerConfiguration.getTimeout())) {
                    this.log.info("Could not allocate slot for {}.", allocationID);
                    throw new SlotAllocationException("Could not allocate slot.");
                }
                this.log.info("Allocated slot for {}.", allocationID);
            } else if (!this.taskSlotTable.isAllocated(slotID.getSlotNumber(), jobID, allocationID)) {
                String str2 = "The slot " + slotID + " has already been allocated for a different job.";
                this.log.info(str2);
                AllocationID currentAllocation = this.taskSlotTable.getCurrentAllocation(slotID.getSlotNumber());
                throw new SlotOccupiedException(str2, currentAllocation, this.taskSlotTable.getOwningJob(currentAllocation));
            }
            if (this.jobManagerTable.contains(jobID)) {
                offerSlotsToJobManager(jobID);
            } else {
                try {
                    this.jobLeaderService.addJob(jobID, str);
                } catch (Exception e) {
                    try {
                        this.taskSlotTable.freeSlot(allocationID);
                    } catch (SlotNotFoundException e2) {
                        onFatalError(e2);
                    }
                    this.localStateStoresManager.releaseLocalStateForAllocationId(allocationID);
                    if (!this.taskSlotTable.isSlotFree(slotID.getSlotNumber())) {
                        onFatalError(new Exception("Could not free slot " + slotID));
                    }
                    throw new SlotAllocationException("Could not add job to job leader service.", e);
                }
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (TaskManagerException e3) {
            return FutureUtils.completedExceptionally(e3);
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationID, Throwable th, Time time) {
        freeSlotInternal(allocationID, th);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time time) {
        String str;
        this.log.debug("Request file {} upload.", fileType);
        switch (fileType) {
            case LOG:
                str = this.taskManagerConfiguration.getTaskManagerLogPath();
                break;
            case STDOUT:
                str = this.taskManagerConfiguration.getTaskManagerStdoutPath();
                break;
            default:
                str = null;
                break;
        }
        if (str == null || str.isEmpty()) {
            this.log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID());
            return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor."));
        }
        File file = new File(str);
        if (!file.exists()) {
            this.log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
            return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
        }
        TransientBlobCache transientBlobService = this.blobCacheService.getTransientBlobService();
        try {
            FileInputStream fileInputStream = new FileInputStream(file);
            Throwable th = null;
            try {
                try {
                    TransientBlobKey putTransient = transientBlobService.putTransient(fileInputStream);
                    if (fileInputStream != null) {
                        if (0 != 0) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                    return CompletableFuture.completedFuture(putTransient);
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            this.log.debug("Could not upload file {}.", fileType, e);
            return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
        }
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress(Time time) {
        return CompletableFuture.completedFuture(SerializableOptional.ofNullable(this.metricQueryServiceAddress));
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void disconnectJobManager(JobID jobID, Exception exc) {
        closeJobManagerConnection(jobID, exc);
        this.jobLeaderService.reconnect(jobID);
    }

    @Override // org.apache.flink.runtime.taskexecutor.TaskExecutorGateway
    public void disconnectResourceManager(Exception exc) {
        if (isRunning()) {
            reconnectToResourceManager(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOfNewResourceManagerLeader(String str, ResourceManagerId resourceManagerId) {
        this.resourceManagerAddress = createResourceManagerAddress(str, resourceManagerId);
        reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", this.resourceManagerAddress)));
    }

    @Nullable
    private ResourceManagerAddress createResourceManagerAddress(@Nullable String str, @Nullable ResourceManagerId resourceManagerId) {
        if (str == null) {
            return null;
        }
        if ($assertionsDisabled || resourceManagerId != null) {
            return new ResourceManagerAddress(str, resourceManagerId);
        }
        throw new AssertionError();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnectToResourceManager(Exception exc) {
        closeResourceManagerConnection(exc);
        startRegistrationTimeout();
        tryConnectToResourceManager();
    }

    private void tryConnectToResourceManager() {
        if (this.resourceManagerAddress != null) {
            connectToResourceManager();
        }
    }

    private void connectToResourceManager() {
        if (!$assertionsDisabled && this.resourceManagerAddress == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.establishedResourceManagerConnection != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.resourceManagerConnection != null) {
            throw new AssertionError();
        }
        this.log.info("Connecting to ResourceManager {}.", this.resourceManagerAddress);
        this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, getRpcService(), this.taskManagerConfiguration.getRetryingRegistrationConfiguration(), this.resourceManagerAddress.getAddress(), this.resourceManagerAddress.getResourceManagerId(), getMainThreadExecutor(), new ResourceManagerRegistrationListener(), new TaskExecutorRegistration(getAddress(), getResourceID(), this.taskManagerLocation.dataPort(), this.hardwareDescription, this.taskManagerConfiguration.getDefaultSlotResourceProfile(), this.taskManagerConfiguration.getTotalResourceProfile()));
        this.resourceManagerConnection.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishResourceManagerConnection(final ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, InstanceID instanceID, ClusterInformation clusterInformation) {
        resourceManagerGateway.sendSlotReport(getResourceID(), instanceID, this.taskSlotTable.createSlotReport(getResourceID()), this.taskManagerConfiguration.getTimeout()).whenCompleteAsync((acknowledge, th) -> {
            if (th != null) {
                reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", th));
            }
        }, (Executor) getMainThreadExecutor());
        this.resourceManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.1
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID2, TaskExecutorHeartbeatPayload taskExecutorHeartbeatPayload) {
                resourceManagerGateway.heartbeatFromTaskManager(resourceID2, taskExecutorHeartbeatPayload);
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID2, TaskExecutorHeartbeatPayload taskExecutorHeartbeatPayload) {
            }
        });
        this.blobCacheService.setBlobServerAddress(new InetSocketAddress(clusterInformation.getBlobServerHostname(), clusterInformation.getBlobServerPort()));
        this.establishedResourceManagerConnection = new EstablishedResourceManagerConnection(resourceManagerGateway, resourceID, instanceID);
        stopRegistrationTimeout();
    }

    private void closeResourceManagerConnection(Exception exc) {
        if (this.establishedResourceManagerConnection != null) {
            ResourceID resourceManagerResourceId = this.establishedResourceManagerConnection.getResourceManagerResourceId();
            if (this.log.isDebugEnabled()) {
                this.log.debug("Close ResourceManager connection {}.", resourceManagerResourceId, exc);
            } else {
                this.log.info("Close ResourceManager connection {}.", resourceManagerResourceId);
            }
            this.resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceId);
            this.establishedResourceManagerConnection.getResourceManagerGateway().disconnectTaskManager(getResourceID(), exc);
            this.establishedResourceManagerConnection = null;
            this.partitionTracker.stopTrackingAndReleaseAllClusterPartitions();
        }
        if (this.resourceManagerConnection != null) {
            if (!this.resourceManagerConnection.isConnected()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Terminating registration attempts towards ResourceManager {}.", this.resourceManagerConnection.getTargetAddress(), exc);
                } else {
                    this.log.info("Terminating registration attempts towards ResourceManager {}.", this.resourceManagerConnection.getTargetAddress());
                }
            }
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    private void startRegistrationTimeout() {
        Time maxRegistrationDuration = this.taskManagerConfiguration.getMaxRegistrationDuration();
        if (maxRegistrationDuration != null) {
            UUID randomUUID = UUID.randomUUID();
            this.currentRegistrationTimeoutId = randomUUID;
            scheduleRunAsync(() -> {
                registrationTimeout(randomUUID);
            }, maxRegistrationDuration);
        }
    }

    private void stopRegistrationTimeout() {
        this.currentRegistrationTimeoutId = null;
    }

    private void registrationTimeout(@Nonnull UUID uuid) {
        if (uuid.equals(this.currentRegistrationTimeoutId)) {
            onFatalError(new RegistrationTimeoutException(String.format("Could not register at the ResourceManager within the specified maximum registration duration %s. This indicates a problem with this instance. Terminating now.", this.taskManagerConfiguration.getMaxRegistrationDuration())));
        }
    }

    private void offerSlotsToJobManager(JobID jobID) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobID);
        if (jobManagerConnection == null) {
            this.log.debug("There is no job manager connection to the leader of job {}.", jobID);
            return;
        }
        if (!this.taskSlotTable.hasAllocatedSlots(jobID)) {
            this.log.debug("There are no unassigned slots for the job {}.", jobID);
            return;
        }
        this.log.info("Offer reserved slots to the leader of job {}.", jobID);
        JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
        Iterator<TaskSlot<Task>> allocatedSlots = this.taskSlotTable.getAllocatedSlots(jobID);
        JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
        HashSet hashSet = new HashSet(2);
        while (allocatedSlots.hasNext()) {
            hashSet.add(allocatedSlots.next().generateSlotOffer());
        }
        jobManagerGateway.offerSlots(getResourceID(), hashSet, this.taskManagerConfiguration.getTimeout()).whenCompleteAsync((BiConsumer<? super Collection<SlotOffer>, ? super Throwable>) handleAcceptedSlotOffers(jobID, jobManagerGateway, jobMasterId, hashSet), (Executor) getMainThreadExecutor());
    }

    @Nonnull
    private BiConsumer<Iterable<SlotOffer>, Throwable> handleAcceptedSlotOffers(JobID jobID, JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, Collection<SlotOffer> collection) {
        return (iterable, th) -> {
            if (th != null) {
                if (th instanceof TimeoutException) {
                    this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                    offerSlotsToJobManager(jobID);
                    return;
                } else {
                    this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", th);
                    Iterator it2 = collection.iterator();
                    while (it2.hasNext()) {
                        freeSlotInternal(((SlotOffer) it2.next()).getAllocationId(), th);
                    }
                    return;
                }
            }
            if (!isJobManagerConnectionValid(jobID, jobMasterId)) {
                this.log.debug("Discard offer slot response since there is a new leader for the job {}.", jobID);
                return;
            }
            Iterator it3 = iterable.iterator();
            while (it3.hasNext()) {
                SlotOffer slotOffer = (SlotOffer) it3.next();
                try {
                    if (!this.taskSlotTable.markSlotActive(slotOffer.getAllocationId())) {
                        String str = "Could not mark slot " + jobID + " active.";
                        this.log.debug(str);
                        jobMasterGateway.failSlot(getResourceID(), slotOffer.getAllocationId(), new FlinkException(str));
                    }
                } catch (SlotNotFoundException e) {
                    jobMasterGateway.failSlot(getResourceID(), slotOffer.getAllocationId(), new FlinkException("Could not mark slot " + jobID + " active."));
                }
                collection.remove(slotOffer);
            }
            Exception exc = new Exception("The slot was rejected by the JobManager.");
            Iterator it4 = collection.iterator();
            while (it4.hasNext()) {
                freeSlotInternal(((SlotOffer) it4.next()).getAllocationId(), exc);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishJobManagerConnection(JobID jobID, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess jMTMRegistrationSuccess) {
        if (this.jobManagerTable.contains(jobID)) {
            if (Objects.equals(this.jobManagerTable.get(jobID).getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken());
                return;
            }
            closeJobManagerConnection(jobID, new Exception("Found new job leader for job id " + jobID + '.'));
        }
        this.log.info("Establish JobManager connection for job {}.", jobID);
        ResourceID resourceID = jMTMRegistrationSuccess.getResourceID();
        JobManagerConnection associateWithJobManager = associateWithJobManager(jobID, resourceID, jobMasterGateway);
        this.jobManagerConnections.put(resourceID, associateWithJobManager);
        this.jobManagerTable.put(jobID, associateWithJobManager);
        this.jobManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<AccumulatorReport>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.2
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID2, AccumulatorReport accumulatorReport) {
                jobMasterGateway.heartbeatFromTaskManager(resourceID2, accumulatorReport);
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID2, AccumulatorReport accumulatorReport) {
            }
        });
        offerSlotsToJobManager(jobID);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeJobManagerConnection(JobID jobID, Exception exc) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Close JobManager connection for job {}.", jobID, exc);
        } else {
            this.log.info("Close JobManager connection for job {}.", jobID);
        }
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobID);
        FlinkException flinkException = new FlinkException("JobManager responsible for " + jobID + " lost the leadership.", exc);
        while (tasks.hasNext()) {
            tasks.next().failExternally(flinkException);
        }
        Iterator<AllocationID> activeSlots = this.taskSlotTable.getActiveSlots(jobID);
        FlinkException flinkException2 = new FlinkException("Slot could not be marked inactive.");
        while (activeSlots.hasNext()) {
            AllocationID next = activeSlots.next();
            try {
                if (!this.taskSlotTable.markSlotInactive(next, this.taskManagerConfiguration.getTimeout())) {
                    freeSlotInternal(next, flinkException2);
                }
            } catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", jobID, e);
            }
        }
        JobManagerConnection remove = this.jobManagerTable.remove(jobID);
        if (remove != null) {
            try {
                this.jobManagerHeartbeatManager.unmonitorTarget(remove.getResourceID());
                this.jobManagerConnections.remove(remove.getResourceID());
                disassociateFromJobManager(remove, exc);
            } catch (IOException e2) {
                this.log.warn("Could not properly disassociate from JobManager {}.", remove.getJobManagerGateway().getAddress(), e2);
            }
        }
    }

    private JobManagerConnection associateWithJobManager(JobID jobID, ResourceID resourceID, JobMasterGateway jobMasterGateway) {
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(jobMasterGateway);
        TaskManagerActionsImpl taskManagerActionsImpl = new TaskManagerActionsImpl(jobMasterGateway);
        RpcCheckpointResponder rpcCheckpointResponder = new RpcCheckpointResponder(jobMasterGateway);
        RpcGlobalAggregateManager rpcGlobalAggregateManager = new RpcGlobalAggregateManager(jobMasterGateway);
        BlobLibraryCacheManager blobLibraryCacheManager = new BlobLibraryCacheManager(this.blobCacheService.getPermanentBlobService(), this.taskManagerConfiguration.getClassLoaderResolveOrder(), this.taskManagerConfiguration.getAlwaysParentFirstLoaderPatterns());
        RpcResultPartitionConsumableNotifier rpcResultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(jobMasterGateway, getRpcService().getExecutor(), this.taskManagerConfiguration.getTimeout());
        RpcPartitionStateChecker rpcPartitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
        registerQueryableState(jobID, jobMasterGateway);
        return new JobManagerConnection(jobID, resourceID, jobMasterGateway, taskManagerActionsImpl, rpcCheckpointResponder, rpcGlobalAggregateManager, blobLibraryCacheManager, rpcResultPartitionConsumableNotifier, rpcPartitionStateChecker);
    }

    private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception exc) throws IOException {
        Preconditions.checkNotNull(jobManagerConnection);
        JobID jobID = jobManagerConnection.getJobID();
        scheduleResultPartitionCleanup(jobID);
        KvStateRegistry kvStateRegistry = this.kvStateService.getKvStateRegistry();
        if (kvStateRegistry != null) {
            kvStateRegistry.unregisterListener(jobID);
        }
        KvStateClientProxy kvStateClientProxy = this.kvStateService.getKvStateClientProxy();
        if (kvStateClientProxy != null) {
            kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobID(), null);
        }
        jobManagerConnection.getJobManagerGateway().disconnectTaskManager(getResourceID(), exc);
        jobManagerConnection.getLibraryCacheManager().shutdown();
    }

    private void scheduleResultPartitionCleanup(JobID jobID) {
        Collection<CompletableFuture<ExecutionState>> remove = this.taskResultPartitionCleanupFuturesPerJob.remove(jobID);
        if (remove != null) {
            FutureUtils.waitForAll(remove).thenRunAsync(() -> {
                this.partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobID);
            }, (Executor) getMainThreadExecutor());
        }
    }

    private void registerQueryableState(JobID jobID, JobMasterGateway jobMasterGateway) {
        KvStateServer kvStateServer = this.kvStateService.getKvStateServer();
        KvStateRegistry kvStateRegistry = this.kvStateService.getKvStateRegistry();
        if (kvStateServer != null && kvStateRegistry != null) {
            kvStateRegistry.registerListener(jobID, new RpcKvStateRegistryListener(jobMasterGateway, kvStateServer.getServerAddress()));
        }
        KvStateClientProxy kvStateClientProxy = this.kvStateService.getKvStateClientProxy();
        if (kvStateClientProxy != null) {
            kvStateClientProxy.updateKvStateLocationOracle(jobID, jobMasterGateway);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failTask(ExecutionAttemptID executionAttemptID, Throwable th) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            this.log.debug("Cannot find task to fail for execution {}.", executionAttemptID);
            return;
        }
        try {
            task.failExternally(th);
        } catch (Throwable th2) {
            this.log.error("Could not fail task {}.", executionAttemptID, th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateTaskExecutionState(JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        ExecutionAttemptID id = taskExecutionState.getID();
        jobMasterGateway.updateTaskExecutionState(taskExecutionState).whenCompleteAsync((acknowledge, th) -> {
            if (th != null) {
                failTask(id, th);
            }
        }, (Executor) getMainThreadExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unregisterTaskAndNotifyFinalState(JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task removeTask = this.taskSlotTable.removeTask(executionAttemptID);
        if (removeTask == null) {
            this.log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
            return;
        }
        if (!removeTask.getExecutionState().isTerminal()) {
            try {
                removeTask.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
            } catch (Exception e) {
                this.log.error("Could not properly fail task.", (Throwable) e);
            }
        }
        this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", removeTask.getExecutionState(), removeTask.getTaskInfo().getTaskNameWithSubtasks(), removeTask.getExecutionId());
        updateTaskExecutionState(jobMasterGateway, new TaskExecutionState(removeTask.getJobID(), removeTask.getExecutionId(), removeTask.getExecutionState(), removeTask.getFailureCause(), removeTask.getAccumulatorRegistry().getSnapshot(), removeTask.getMetricGroup().getIOMetricGroup().createSnapshot()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void freeSlotInternal(AllocationID allocationID, Throwable th) {
        Preconditions.checkNotNull(allocationID);
        this.log.debug("Free slot with allocation id {} because: {}", allocationID, th.getMessage());
        try {
            JobID owningJob = this.taskSlotTable.getOwningJob(allocationID);
            int freeSlot = this.taskSlotTable.freeSlot(allocationID, th);
            if (freeSlot != -1) {
                if (isConnectedToResourceManager()) {
                    this.establishedResourceManagerConnection.getResourceManagerGateway().notifySlotAvailable(this.establishedResourceManagerConnection.getTaskExecutorRegistrationId(), new SlotID(getResourceID(), freeSlot), allocationID);
                }
                if (owningJob != null) {
                    closeJobManagerConnectionIfNoAllocatedResources(owningJob);
                }
            }
        } catch (SlotNotFoundException e) {
            this.log.debug("Could not free slot for allocation id {}.", allocationID, e);
        }
        this.localStateStoresManager.releaseLocalStateForAllocationId(allocationID);
    }

    private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobID) {
        if (!this.taskSlotTable.getAllocationIdsPerJob(jobID).isEmpty() || this.partitionTracker.isTrackingPartitionsFor(jobID)) {
            return;
        }
        try {
            this.jobLeaderService.removeJob(jobID);
        } catch (Exception e) {
            this.log.info("Could not remove job {} from JobLeaderService.", jobID, e);
        }
        closeJobManagerConnection(jobID, new FlinkException("TaskExecutor " + getAddress() + " has no more allocated slots for job " + jobID + '.'));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void timeoutSlot(AllocationID allocationID, UUID uuid) {
        Preconditions.checkNotNull(allocationID);
        Preconditions.checkNotNull(uuid);
        if (this.taskSlotTable.isValidTimeout(allocationID, uuid)) {
            freeSlotInternal(allocationID, new Exception("The slot " + allocationID + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationID, uuid);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncSlotsWithSnapshotFromJobMaster(AllocatedSlotReport allocatedSlotReport) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(allocatedSlotReport.getJobId());
        if (jobManagerConnection == null) {
            this.log.debug("Ignoring allocated slot report from job {} because there is no active leader.", allocatedSlotReport.getJobId());
        } else {
            failNoLongerAllocatedSlots(allocatedSlotReport, jobManagerConnection.getJobManagerGateway());
            freeNoLongerUsedSlots(allocatedSlotReport);
        }
    }

    private void failNoLongerAllocatedSlots(AllocatedSlotReport allocatedSlotReport, JobMasterGateway jobMasterGateway) {
        for (AllocatedSlotInfo allocatedSlotInfo : allocatedSlotReport.getAllocatedSlotInfos()) {
            AllocationID allocationId = allocatedSlotInfo.getAllocationId();
            if (!this.taskSlotTable.isAllocated(allocatedSlotInfo.getSlotIndex(), allocatedSlotReport.getJobId(), allocationId)) {
                jobMasterGateway.failSlot(getResourceID(), allocationId, new FlinkException(String.format("Slot %s on TaskExecutor %s is not allocated by job %s.", Integer.valueOf(allocatedSlotInfo.getSlotIndex()), getResourceID(), allocatedSlotReport.getJobId())));
            }
        }
    }

    private void freeNoLongerUsedSlots(AllocatedSlotReport allocatedSlotReport) {
        Iterator it2 = Sets.difference(Sets.newHashSet(this.taskSlotTable.getActiveSlots(allocatedSlotReport.getJobId())), (Set) allocatedSlotReport.getAllocatedSlotInfos().stream().map((v0) -> {
            return v0.getAllocationId();
        }).collect(Collectors.toSet())).iterator();
        while (it2.hasNext()) {
            AllocationID allocationID = (AllocationID) it2.next();
            freeSlotInternal(allocationID, new FlinkException(String.format("%s is no longer allocated by job %s.", allocationID, allocatedSlotReport.getJobId())));
        }
    }

    private boolean isConnectedToResourceManager() {
        return this.establishedResourceManagerConnection != null;
    }

    private boolean isConnectedToResourceManager(ResourceManagerId resourceManagerId) {
        return (this.establishedResourceManagerConnection == null || this.resourceManagerAddress == null || !this.resourceManagerAddress.getResourceManagerId().equals(resourceManagerId)) ? false : true;
    }

    private boolean isJobManagerConnectionValid(JobID jobID, JobMasterId jobMasterId) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobID);
        return jobManagerConnection != null && Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId);
    }

    public ResourceID getResourceID() {
        return this.taskManagerLocation.getResourceID();
    }

    void onFatalError(Throwable th) {
        try {
            this.log.error("Fatal error occurred in TaskExecutor {}.", getAddress(), th);
        } catch (Throwable th2) {
        }
        this.fatalErrorHandler.onFatalError(th);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }

    @VisibleForTesting
    HeartbeatManager<Void, TaskExecutorHeartbeatPayload> getResourceManagerHeartbeatManager() {
        return this.resourceManagerHeartbeatManager;
    }

    static {
        $assertionsDisabled = !TaskExecutor.class.desiredAssertionStatus();
    }
}
