package io.druid.server.bridge;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.curator.announcement.Announcer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.utils.ZKPaths;

@ManageLifecycle
/* loaded from: input_file:io/druid/server/bridge/DruidClusterBridge.class */
public class DruidClusterBridge {
    public static final String BRIDGE_OWNER_NODE = "_BRIDGE";
    public static final String NODE_TYPE = "bridge";
    private static final EmittingLogger log = new EmittingLogger(DruidClusterBridge.class);
    private final ObjectMapper jsonMapper;
    private final DruidClusterBridgeConfig config;
    private final ScheduledExecutorService exec;
    private final DruidNode self;
    private final CuratorFramework curator;
    private final AtomicReference<LeaderLatch> leaderLatch;
    private final BridgeZkCoordinator bridgeZkCoordinator;
    private final Announcer announcer;
    private final ServerInventoryView<Object> serverInventoryView;
    private final ZkPathsConfig zkPathsConfig;
    private final DruidServerMetadata druidServerMetadata;
    private final Map<DataSegment, Integer> segments = Maps.newHashMap();
    private final Object lock = new Object();
    private volatile boolean started = false;
    private volatile boolean leader = false;

    @Inject
    public DruidClusterBridge(ObjectMapper objectMapper, DruidClusterBridgeConfig druidClusterBridgeConfig, ZkPathsConfig zkPathsConfig, DruidServerMetadata druidServerMetadata, ScheduledExecutorFactory scheduledExecutorFactory, @Self DruidNode druidNode, CuratorFramework curatorFramework, AtomicReference<LeaderLatch> atomicReference, BridgeZkCoordinator bridgeZkCoordinator, @Bridge Announcer announcer, @Bridge final AbstractDataSegmentAnnouncer abstractDataSegmentAnnouncer, ServerInventoryView serverInventoryView) {
        this.jsonMapper = objectMapper;
        this.config = druidClusterBridgeConfig;
        this.bridgeZkCoordinator = bridgeZkCoordinator;
        this.zkPathsConfig = zkPathsConfig;
        this.announcer = announcer;
        this.serverInventoryView = serverInventoryView;
        this.curator = curatorFramework;
        this.leaderLatch = atomicReference;
        this.druidServerMetadata = druidServerMetadata;
        this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
        this.self = druidNode;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DruidClusterBridge-ServerInventoryView-%d").build());
        serverInventoryView.registerSegmentCallback(newFixedThreadPool, new ServerView.BaseSegmentCallback() { // from class: io.druid.server.bridge.DruidClusterBridge.1
            @Override // io.druid.client.ServerView.BaseSegmentCallback, io.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata2, DataSegment dataSegment) {
                try {
                    synchronized (DruidClusterBridge.this.lock) {
                        Integer num = (Integer) DruidClusterBridge.this.segments.get(dataSegment);
                        if (num == null) {
                            DruidClusterBridge.this.segments.put(dataSegment, 1);
                            abstractDataSegmentAnnouncer.announceSegment(dataSegment);
                        } else {
                            DruidClusterBridge.this.segments.put(dataSegment, Integer.valueOf(num.intValue() + 1));
                        }
                    }
                    return ServerView.CallbackAction.CONTINUE;
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }

            @Override // io.druid.client.ServerView.BaseSegmentCallback, io.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata druidServerMetadata2, DataSegment dataSegment) {
                try {
                    synchronized (DruidClusterBridge.this.lock) {
                        DruidClusterBridge.this.serverRemovedSegment(abstractDataSegmentAnnouncer, dataSegment, druidServerMetadata2);
                    }
                    return ServerView.CallbackAction.CONTINUE;
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        });
        serverInventoryView.registerServerCallback(newFixedThreadPool, new ServerView.ServerCallback() { // from class: io.druid.server.bridge.DruidClusterBridge.2
            @Override // io.druid.client.ServerView.ServerCallback
            public ServerView.CallbackAction serverRemoved(DruidServer druidServer) {
                try {
                    Iterator<DataSegment> it = druidServer.getSegments().values().iterator();
                    while (it.hasNext()) {
                        DruidClusterBridge.this.serverRemovedSegment(abstractDataSegmentAnnouncer, it.next(), druidServer.getMetadata());
                    }
                    return ServerView.CallbackAction.CONTINUE;
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        });
    }

    public boolean isLeader() {
        return this.leader;
    }

    @LifecycleStart
    public void start() {
        synchronized (this.lock) {
            if (this.started) {
                return;
            }
            this.started = true;
            createNewLeaderLatch();
            try {
                this.leaderLatch.get().start();
            } catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }
    }

    private LeaderLatch createNewLeaderLatch() {
        LeaderLatch leaderLatch = new LeaderLatch(this.curator, ZKPaths.makePath(this.zkPathsConfig.getConnectorPath(), BRIDGE_OWNER_NODE), this.self.getHostAndPort());
        leaderLatch.addListener(new LeaderLatchListener() { // from class: io.druid.server.bridge.DruidClusterBridge.3
            public void isLeader() {
                DruidClusterBridge.this.becomeLeader();
            }

            public void notLeader() {
                DruidClusterBridge.this.stopBeingLeader();
            }
        }, Execs.singleThreaded("CoordinatorLeader-%s"));
        return this.leaderLatch.getAndSet(leaderLatch);
    }

    @LifecycleStop
    public void stop() {
        synchronized (this.lock) {
            if (this.started) {
                stopBeingLeader();
                try {
                    this.leaderLatch.get().close();
                } catch (IOException e) {
                    log.warn(e, "Unable to close leaderLatch, ignoring", new Object[0]);
                }
                this.exec.shutdown();
                this.started = false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void becomeLeader() {
        synchronized (this.lock) {
            if (this.started) {
                log.info("Go-Go Gadgetmobile! Starting bridge in %s", new Object[]{this.config.getStartDelay()});
                try {
                    this.bridgeZkCoordinator.start();
                    this.serverInventoryView.start();
                    ScheduledExecutors.scheduleWithFixedDelay(this.exec, this.config.getStartDelay(), this.config.getPeriod(), new Callable<ScheduledExecutors.Signal>() { // from class: io.druid.server.bridge.DruidClusterBridge.4
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public ScheduledExecutors.Signal call() {
                            if (DruidClusterBridge.this.leader) {
                                long j = 0;
                                Iterator it = FunctionalIterable.create(DruidClusterBridge.this.serverInventoryView.getInventory()).filter(new Predicate<DruidServer>() { // from class: io.druid.server.bridge.DruidClusterBridge.4.1
                                    public boolean apply(DruidServer druidServer) {
                                        return druidServer.isAssignable();
                                    }
                                }).iterator();
                                while (it.hasNext()) {
                                    j += ((DruidServer) it.next()).getMaxSize();
                                }
                                if (j == 0) {
                                    DruidClusterBridge.log.warn("No servers founds!", new Object[0]);
                                } else {
                                    DruidServerMetadata druidServerMetadata = new DruidServerMetadata(DruidClusterBridge.this.self.getHostAndPort(), DruidClusterBridge.this.self.getHostAndPort(), j, DruidClusterBridge.NODE_TYPE, DruidClusterBridge.this.druidServerMetadata.getTier(), DruidClusterBridge.this.druidServerMetadata.getPriority());
                                    try {
                                        String makePath = ZKPaths.makePath(DruidClusterBridge.this.zkPathsConfig.getAnnouncementsPath(), DruidClusterBridge.this.self.getHostAndPort());
                                        DruidClusterBridge.log.info("Updating [%s] to have a maxSize of[%,d] bytes", new Object[]{DruidClusterBridge.this.self.getHost(), Long.valueOf(j)});
                                        DruidClusterBridge.this.announcer.update(makePath, DruidClusterBridge.this.jsonMapper.writeValueAsBytes(druidServerMetadata));
                                    } catch (Exception e) {
                                        throw Throwables.propagate(e);
                                    }
                                }
                            }
                            return DruidClusterBridge.this.leader ? ScheduledExecutors.Signal.REPEAT : ScheduledExecutors.Signal.STOP;
                        }
                    });
                    this.leader = true;
                } catch (Exception e) {
                    log.makeAlert(e, "Exception becoming leader", new Object[0]).emit();
                    CloseQuietly.close(createNewLeaderLatch());
                    try {
                        this.leaderLatch.get().start();
                    } catch (Exception e2) {
                        log.makeAlert(e2, "I am a zombie", new Object[0]).emit();
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopBeingLeader() {
        synchronized (this.lock) {
            try {
                log.info("I'll get you next time, Gadget. Next time!", new Object[0]);
                this.bridgeZkCoordinator.stop();
                this.serverInventoryView.stop();
                this.leader = false;
            } catch (Exception e) {
                log.makeAlert(e, "Unable to stopBeingLeader", new Object[0]).emit();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment dataSegment, DruidServerMetadata druidServerMetadata) throws IOException {
        Integer num = this.segments.get(dataSegment);
        if (num == null) {
            log.makeAlert("Trying to remove a segment that was never added?", new Object[0]).addData("server", druidServerMetadata.getHost()).addData("segmentId", dataSegment.getIdentifier()).emit();
        } else if (num.intValue() != 1) {
            this.segments.put(dataSegment, Integer.valueOf(num.intValue() - 1));
        } else {
            dataSegmentAnnouncer.unannounceSegment(dataSegment);
            this.segments.remove(dataSegment);
        }
    }
}
