package org.apache.flink.runtime.heartbeat;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitor;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.class */
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
    private final long heartbeatTimeoutIntervalMs;
    private final ResourceID ownResourceID;
    private final HeartbeatListener<I, O> heartbeatListener;
    private final ScheduledExecutor mainThreadExecutor;
    protected final Logger log;
    private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    private final HeartbeatMonitor.Factory<O> heartbeatMonitorFactory;
    protected volatile boolean stopped;

    public HeartbeatManagerImpl(long j, ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger logger) {
        this(j, resourceID, heartbeatListener, scheduledExecutor, logger, new HeartbeatMonitorImpl.Factory());
    }

    public HeartbeatManagerImpl(long j, ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger logger, HeartbeatMonitor.Factory<O> factory) {
        Preconditions.checkArgument(j > 0, "The heartbeat timeout has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = j;
        this.ownResourceID = (ResourceID) Preconditions.checkNotNull(resourceID);
        this.heartbeatListener = (HeartbeatListener) Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
        this.mainThreadExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.heartbeatMonitorFactory = factory;
        this.heartbeatTargets = new ConcurrentHashMap<>(16);
        this.stopped = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceID getOwnResourceID() {
        return this.ownResourceID;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HeartbeatListener<I, O> getHeartbeatListener() {
        return this.heartbeatListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<ResourceID, HeartbeatMonitor<O>> getHeartbeatTargets() {
        return this.heartbeatTargets;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
        if (this.stopped) {
            return;
        }
        if (this.heartbeatTargets.containsKey(resourceID)) {
            this.log.debug("The target with resource ID {} is already been monitored.", resourceID);
            return;
        }
        HeartbeatMonitor<O> createHeartbeatMonitor = this.heartbeatMonitorFactory.createHeartbeatMonitor(resourceID, heartbeatTarget, this.mainThreadExecutor, this.heartbeatListener, this.heartbeatTimeoutIntervalMs);
        this.heartbeatTargets.put(resourceID, createHeartbeatMonitor);
        if (this.stopped) {
            createHeartbeatMonitor.cancel();
            this.heartbeatTargets.remove(resourceID);
        }
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public void unmonitorTarget(ResourceID resourceID) {
        HeartbeatMonitor<O> remove;
        if (this.stopped || (remove = this.heartbeatTargets.remove(resourceID)) == null) {
            return;
        }
        remove.cancel();
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public void stop() {
        this.stopped = true;
        Iterator<HeartbeatMonitor<O>> it2 = this.heartbeatTargets.values().iterator();
        while (it2.hasNext()) {
            it2.next().cancel();
        }
        this.heartbeatTargets.clear();
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatManager
    public long getLastHeartbeatFrom(ResourceID resourceID) {
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
        if (heartbeatMonitor != null) {
            return heartbeatMonitor.getLastHeartbeat();
        }
        return -1L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ScheduledExecutor getMainThreadExecutor() {
        return this.mainThreadExecutor;
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
    public void receiveHeartbeat(ResourceID resourceID, I i) {
        if (this.stopped) {
            return;
        }
        this.log.debug("Received heartbeat from {}.", resourceID);
        reportHeartbeat(resourceID);
        if (i != null) {
            this.heartbeatListener.reportPayload(resourceID, i);
        }
    }

    @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
    public void requestHeartbeat(ResourceID resourceID, I i) {
        if (this.stopped) {
            return;
        }
        this.log.debug("Received heartbeat request from {}.", resourceID);
        HeartbeatTarget<O> reportHeartbeat = reportHeartbeat(resourceID);
        if (reportHeartbeat != null) {
            if (i != null) {
                this.heartbeatListener.reportPayload(resourceID, i);
            }
            reportHeartbeat.receiveHeartbeat(getOwnResourceID(), this.heartbeatListener.retrievePayload(resourceID));
        }
    }

    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (!this.heartbeatTargets.containsKey(resourceID)) {
            return null;
        }
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
        heartbeatMonitor.reportHeartbeat();
        return heartbeatMonitor.getHeartbeatTarget();
    }
}
