package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
import org.apache.flink.runtime.webmonitor.stats.Statistics;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker.class */
public class JobVertexThreadInfoTracker<T extends Statistics> implements JobVertexStatsTracker<T> {
    private static final Logger LOG;

    @GuardedBy("lock")
    private final ThreadInfoRequestCoordinator coordinator;
    private final Function<JobVertexThreadInfoStats, T> createStatsFn;
    private final ExecutorService executor;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;

    @GuardedBy("lock")
    private final Cache<Key, T> vertexStatsCache;
    private final int numSamples;
    private final Duration statsRefreshInterval;
    private final Duration delayBetweenSamples;
    private final int maxThreadInfoDepth;
    private boolean shutDown;
    private final Time rpcTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Set<Key> pendingStats = new HashSet();
    private final CompletableFuture<Void> resultAvailableFuture = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker$Key.class */
    public static class Key {
        private final JobID jobId;
        private final JobVertexID jobVertexId;

        private Key(JobID jobID, JobVertexID jobVertexID) {
            this.jobId = jobID;
            this.jobVertexId = jobVertexID;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Key key = (Key) obj;
            return Objects.equals(this.jobId, key.jobId) && Objects.equals(this.jobVertexId, key.jobVertexId);
        }

        public int hashCode() {
            return Objects.hash(this.jobId, this.jobVertexId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTracker$ThreadInfoSampleCompletionCallback.class */
    public class ThreadInfoSampleCompletionCallback implements BiConsumer<JobVertexThreadInfoStats, Throwable> {
        private final Key key;
        private final AccessExecutionJobVertex vertex;

        ThreadInfoSampleCompletionCallback(Key key, AccessExecutionJobVertex accessExecutionJobVertex) {
            this.key = key;
            this.vertex = accessExecutionJobVertex;
        }

        @Override // java.util.function.BiConsumer
        public void accept(JobVertexThreadInfoStats jobVertexThreadInfoStats, Throwable th) {
            synchronized (JobVertexThreadInfoTracker.this.lock) {
                try {
                    try {
                    } finally {
                        JobVertexThreadInfoTracker.this.pendingStats.remove(this.key);
                    }
                } catch (Throwable th2) {
                    JobVertexThreadInfoTracker.LOG.error("Error during stats completion.", th2);
                    JobVertexThreadInfoTracker.this.pendingStats.remove(this.key);
                }
                if (JobVertexThreadInfoTracker.this.shutDown) {
                    return;
                }
                if (jobVertexThreadInfoStats != null) {
                    JobVertexThreadInfoTracker.this.resultAvailableFuture.complete(null);
                    JobVertexThreadInfoTracker.this.vertexStatsCache.put(this.key, JobVertexThreadInfoTracker.this.createStatsFn.apply(jobVertexThreadInfoStats));
                } else {
                    JobVertexThreadInfoTracker.LOG.debug("Failed to gather a thread info sample for {}", this.vertex.getName(), th);
                }
                JobVertexThreadInfoTracker.this.pendingStats.remove(this.key);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobVertexThreadInfoTracker(ThreadInfoRequestCoordinator threadInfoRequestCoordinator, GatewayRetriever<ResourceManagerGateway> gatewayRetriever, Function<JobVertexThreadInfoStats, T> function, ScheduledExecutorService scheduledExecutorService, Duration duration, int i, Duration duration2, Duration duration3, int i2, Time time) {
        this.coordinator = (ThreadInfoRequestCoordinator) Preconditions.checkNotNull(threadInfoRequestCoordinator, "Thread info samples coordinator");
        this.resourceManagerGatewayRetriever = (GatewayRetriever) Preconditions.checkNotNull(gatewayRetriever, "Gateway retriever");
        this.createStatsFn = (Function) Preconditions.checkNotNull(function, "Create stats function");
        this.executor = (ExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "Scheduled executor");
        this.statsRefreshInterval = (Duration) Preconditions.checkNotNull(duration2, "Statistics refresh interval");
        this.rpcTimeout = time;
        Preconditions.checkArgument(duration.toMillis() > 0, "Clean up interval must be greater than 0");
        Preconditions.checkArgument(i >= 1, "Number of samples");
        this.numSamples = i;
        Preconditions.checkArgument(duration2.toMillis() > 0, "Stats refresh interval must be greater than 0");
        this.delayBetweenSamples = (Duration) Preconditions.checkNotNull(duration3, "Delay between samples");
        Preconditions.checkArgument(i2 > 0, "Max stack trace depth must be greater than 0");
        this.maxThreadInfoDepth = i2;
        this.vertexStatsCache = (Cache<Key, T>) CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(duration.toMillis(), TimeUnit.MILLISECONDS).build();
        scheduledExecutorService.scheduleWithFixedDelay(this::cleanUpVertexStatsCache, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker
    public Optional<T> getVertexStats(JobID jobID, AccessExecutionJobVertex accessExecutionJobVertex) {
        Optional<T> ofNullable;
        synchronized (this.lock) {
            Key key = getKey(jobID, accessExecutionJobVertex);
            T ifPresent = this.vertexStatsCache.getIfPresent(key);
            if (ifPresent == null || System.currentTimeMillis() >= ifPresent.getEndTime() + this.statsRefreshInterval.toMillis()) {
                triggerThreadInfoSampleInternal(key, accessExecutionJobVertex);
            }
            ofNullable = Optional.ofNullable(ifPresent);
        }
        return ofNullable;
    }

    private void triggerThreadInfoSampleInternal(Key key, AccessExecutionJobVertex accessExecutionJobVertex) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.shutDown || this.pendingStats.contains(key)) {
            return;
        }
        this.pendingStats.add(key);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Triggering thread info sample for tasks: {}", Arrays.toString(accessExecutionJobVertex.getTaskVertices()));
        }
        AccessExecutionVertex[] taskVertices = accessExecutionJobVertex.getTaskVertices();
        this.resourceManagerGatewayRetriever.getFuture().thenCompose(resourceManagerGateway -> {
            return this.coordinator.triggerThreadInfoRequest(matchExecutionsWithGateways(taskVertices, resourceManagerGateway), this.numSamples, this.delayBetweenSamples, this.maxThreadInfoDepth);
        }).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) new ThreadInfoSampleCompletionCallback(key, accessExecutionJobVertex), (Executor) this.executor);
    }

    private Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> matchExecutionsWithGateways(AccessExecutionVertex[] accessExecutionVertexArr, ResourceManagerGateway resourceManagerGateway) {
        HashMap hashMap = new HashMap();
        for (AccessExecutionVertex accessExecutionVertex : accessExecutionVertexArr) {
            TaskManagerLocation currentAssignedResourceLocation = accessExecutionVertex.getCurrentAssignedResourceLocation();
            if (currentAssignedResourceLocation != null) {
                CompletableFuture<TaskExecutorThreadInfoGateway> requestTaskExecutorThreadInfoGateway = resourceManagerGateway.requestTaskExecutorThreadInfoGateway(currentAssignedResourceLocation.getResourceID(), this.rpcTimeout);
                if (accessExecutionVertex.getExecutionState() == ExecutionState.RUNNING) {
                    hashMap.put(accessExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), requestTaskExecutorThreadInfoGateway);
                } else {
                    LOG.trace("{} not running, but {}; not sampling", accessExecutionVertex.getTaskNameWithSubtaskIndex(), accessExecutionVertex.getExecutionState());
                }
            } else {
                LOG.trace("ExecutionVertex {} is currently not assigned", accessExecutionVertex);
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    void cleanUpVertexStatsCache() {
        this.vertexStatsCache.cleanUp();
    }

    @Override // org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker
    public void shutDown() {
        synchronized (this.lock) {
            if (!this.shutDown) {
                this.vertexStatsCache.invalidateAll();
                this.pendingStats.clear();
                this.shutDown = true;
            }
        }
    }

    @VisibleForTesting
    CompletableFuture<Void> getResultAvailableFuture() {
        return this.resultAvailableFuture;
    }

    private static Key getKey(JobID jobID, AccessExecutionJobVertex accessExecutionJobVertex) {
        return new Key(jobID, accessExecutionJobVertex.getJobVertexId());
    }

    static {
        $assertionsDisabled = !JobVertexThreadInfoTracker.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) JobVertexThreadInfoTracker.class);
    }
}
