package io.druid.segment.realtime.plumber;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.client.coordinator.CoordinatorClient;
import io.druid.concurrent.Execs;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.server.coordination.DruidServerMetadata;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.class */
public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNotifier {
    private static final Logger log = new Logger(CoordinatorBasedSegmentHandoffNotifier.class);
    private final ConcurrentMap<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks = Maps.newConcurrentMap();
    private final CoordinatorClient coordinatorClient;
    private volatile ScheduledExecutorService scheduledExecutor;
    private final long pollDurationMillis;
    private final String dataSource;

    public CoordinatorBasedSegmentHandoffNotifier(String str, CoordinatorClient coordinatorClient, CoordinatorBasedSegmentHandoffNotifierConfig coordinatorBasedSegmentHandoffNotifierConfig) {
        this.dataSource = str;
        this.coordinatorClient = coordinatorClient;
        this.pollDurationMillis = coordinatorBasedSegmentHandoffNotifierConfig.getPollDuration().getMillis();
    }

    @Override // io.druid.segment.realtime.plumber.SegmentHandoffNotifier
    public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
        log.info("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", new Object[]{this.dataSource, segmentDescriptor});
        return this.handOffCallbacks.putIfAbsent(segmentDescriptor, new Pair<>(executor, runnable)) == null;
    }

    @Override // io.druid.segment.realtime.plumber.SegmentHandoffNotifier
    public void start() {
        this.scheduledExecutor = Execs.scheduledSingleThreaded("coordinator_handoff_scheduled_%d");
        this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.1
            @Override // java.lang.Runnable
            public void run() {
                CoordinatorBasedSegmentHandoffNotifier.this.checkForSegmentHandoffs();
            }
        }, 0L, this.pollDurationMillis, TimeUnit.MILLISECONDS);
    }

    void checkForSegmentHandoffs() {
        try {
            Iterator<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> it = this.handOffCallbacks.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> next = it.next();
                SegmentDescriptor key = next.getKey();
                try {
                    if (isHandOffComplete(this.coordinatorClient.fetchServerView(this.dataSource, key.getInterval(), true), next.getKey())) {
                        log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", new Object[]{this.dataSource, key});
                        ((Executor) next.getValue().lhs).execute((Runnable) next.getValue().rhs);
                        it.remove();
                    }
                } catch (Exception e) {
                    log.error(e, "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", new Object[]{this.dataSource, key, Long.valueOf(this.pollDurationMillis)});
                }
            }
            if (!this.handOffCallbacks.isEmpty()) {
                log.info("Still waiting for Handoff for Segments : [%s]", new Object[]{this.handOffCallbacks.keySet()});
            }
        } catch (Throwable th) {
            log.error(th, "Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs", new Object[]{this.dataSource, Long.valueOf(this.pollDurationMillis)});
        }
    }

    static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> list, SegmentDescriptor segmentDescriptor) {
        for (ImmutableSegmentLoadInfo immutableSegmentLoadInfo : list) {
            if (immutableSegmentLoadInfo.getSegment().getInterval().contains(segmentDescriptor.getInterval()) && immutableSegmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == segmentDescriptor.getPartitionNumber() && immutableSegmentLoadInfo.getSegment().getVersion().compareTo(segmentDescriptor.getVersion()) >= 0 && Iterables.any(immutableSegmentLoadInfo.getServers(), new Predicate<DruidServerMetadata>() { // from class: io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.2
                public boolean apply(DruidServerMetadata druidServerMetadata) {
                    return druidServerMetadata.isAssignable();
                }
            })) {
                return true;
            }
        }
        return false;
    }

    @Override // io.druid.segment.realtime.plumber.SegmentHandoffNotifier, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.scheduledExecutor.shutdown();
    }

    Map<SegmentDescriptor, Pair<Executor, Runnable>> getHandOffCallbacks() {
        return this.handOffCallbacks;
    }
}
