package io.seata.server.cluster.manager;

import io.seata.common.thread.NamedThreadFactory;
import io.seata.server.cluster.listener.ClusterChangeEvent;
import io.seata.server.cluster.listener.ClusterChangeListener;
import io.seata.server.cluster.watch.Watcher;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:io/seata/server/cluster/manager/ClusterWatcherManager.class */
public class ClusterWatcherManager implements ClusterChangeListener {
    private static final Map<String, Queue<Watcher<?>>> WATCHERS = new ConcurrentHashMap();
    private static final Map<String, Long> GROUP_UPDATE_TIME = new ConcurrentHashMap();
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new NamedThreadFactory("long-polling", 1));

    @PostConstruct
    public void init() {
        this.scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
            Iterator<String> it = WATCHERS.keySet().iterator();
            while (it.hasNext()) {
                Optional.ofNullable(WATCHERS.remove(it.next())).ifPresent(queue -> {
                    queue.parallelStream().forEach(watcher -> {
                        if (System.currentTimeMillis() >= watcher.getTimeout()) {
                            HttpServletResponse response = ((AsyncContext) watcher.getAsyncContext()).getResponse();
                            watcher.setDone(true);
                            response.setStatus(304);
                            ((AsyncContext) watcher.getAsyncContext()).complete();
                        }
                        if (watcher.isDone()) {
                            return;
                        }
                        registryWatcher(watcher);
                    });
                });
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override // io.seata.server.cluster.listener.ClusterChangeListener
    @Async
    @EventListener
    public void onChangeEvent(ClusterChangeEvent clusterChangeEvent) {
        if (clusterChangeEvent.getTerm() > 0) {
            GROUP_UPDATE_TIME.put(clusterChangeEvent.getGroup(), Long.valueOf(clusterChangeEvent.getTerm()));
            Optional.ofNullable(WATCHERS.remove(clusterChangeEvent.getGroup())).ifPresent(queue -> {
                queue.parallelStream().forEach(this::notify);
            });
        }
    }

    private void notify(Watcher<?> watcher) {
        AsyncContext asyncContext = (AsyncContext) watcher.getAsyncContext();
        HttpServletResponse response = asyncContext.getResponse();
        watcher.setDone(true);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("notify cluster change event to: {}", asyncContext.getRequest().getRemoteAddr());
        }
        response.setStatus(200);
        asyncContext.complete();
    }

    public void registryWatcher(Watcher<?> watcher) {
        String group = watcher.getGroup();
        Long l = GROUP_UPDATE_TIME.get(group);
        if (l == null || watcher.getTerm() >= l.longValue()) {
            WATCHERS.computeIfAbsent(group, str -> {
                return new ConcurrentLinkedQueue();
            }).add(watcher);
        } else {
            notify(watcher);
        }
    }
}
