package org.apache.iotdb.session.subscription;

import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.class */
public class SubscriptionEndpointsSyncer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SubscriptionEndpointsSyncer.class);
    private final SubscriptionConsumer consumer;

    public SubscriptionEndpointsSyncer(SubscriptionConsumer subscriptionConsumer) {
        this.consumer = subscriptionConsumer;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.consumer.isClosed()) {
            return;
        }
        this.consumer.acquireWriteLock();
        try {
            syncInternal();
        } finally {
            this.consumer.releaseWriteLock();
        }
    }

    private void syncInternal() {
        if (this.consumer.hasNoProviders()) {
            try {
                this.consumer.openProviders();
            } catch (IoTDBConnectionException e) {
                LOGGER.warn("something unexpected happened when syncing subscription endpoints...", (Throwable) e);
                return;
            }
        }
        try {
            Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection = this.consumer.fetchAllEndPointsWithRedirection();
            for (Map.Entry<Integer, TEndPoint> entry : fetchAllEndPointsWithRedirection.entrySet()) {
                SubscriptionProvider provider = this.consumer.getProvider(entry.getKey().intValue());
                if (Objects.isNull(provider)) {
                    TEndPoint value = entry.getValue();
                    try {
                        this.consumer.addProvider(entry.getKey().intValue(), this.consumer.constructProviderAndHandshake(value));
                    } catch (Exception e2) {
                        LOGGER.warn("Failed to create connection with endpoint {}, exception: {}, will retry later...", value, e2.getMessage());
                    }
                } else {
                    try {
                        provider.getSessionConnection().heartbeat();
                        provider.setAvailable();
                    } catch (Exception e3) {
                        LOGGER.warn("something unexpected happened when sending heartbeat to subscription provider {}, exception: {}, set subscription provider unavailable", provider, e3.getMessage());
                        provider.setUnavailable();
                    }
                    if (!provider.isAvailable()) {
                        try {
                            this.consumer.closeAndRemoveProvider(entry.getKey().intValue());
                        } catch (IoTDBConnectionException e4) {
                            LOGGER.warn("Exception occurred when closing and removing subscription provider with data node id {}: {}", entry.getKey(), e4.getMessage());
                        }
                    }
                }
            }
            Iterator<SubscriptionProvider> it = this.consumer.getAllProviders().iterator();
            while (it.hasNext()) {
                int dataNodeId = it.next().getDataNodeId();
                if (!fetchAllEndPointsWithRedirection.containsKey(Integer.valueOf(dataNodeId))) {
                    try {
                        this.consumer.closeAndRemoveProvider(dataNodeId);
                    } catch (IoTDBConnectionException e5) {
                        LOGGER.warn("Exception occurred when closing and removing subscription provider with data node id {}: {}", Integer.valueOf(dataNodeId), e5.getMessage());
                    }
                }
            }
        } catch (Exception e6) {
            LOGGER.warn("Failed to fetch all endpoints, exception: {}, will retry later...", e6.getMessage());
        }
    }
}
