package io.druid.server.bridge;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.client.BatchServerInventoryView;
import io.druid.client.CachingClusteredClientTest;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.DruidNode;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.Test;

/* loaded from: input_file:io/druid/server/bridge/DruidClusterBridgeTest.class */
public class DruidClusterBridgeTest {
    public static final int WAIT_MAX_RETRY = 100;
    public static final int WAIT_SLEEP_MILLIS = 200;

    @Test
    public void testRun() throws Exception {
        TestingCluster testingCluster = new TestingCluster(1);
        testingCluster.start();
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString(testingCluster.getConnectString()).retryPolicy(new ExponentialBackoffRetry(1, 10)).compressionProvider(new PotentiallyGzippedCompressionProvider(false)).build();
        build.start();
        TestingCluster testingCluster2 = new TestingCluster(1);
        testingCluster2.start();
        CuratorFramework build2 = CuratorFrameworkFactory.builder().connectString(testingCluster2.getConnectString()).retryPolicy(new ExponentialBackoffRetry(1, 10)).compressionProvider(new PotentiallyGzippedCompressionProvider(false)).build();
        build2.start();
        DefaultObjectMapper defaultObjectMapper = new DefaultObjectMapper();
        DruidClusterBridgeConfig druidClusterBridgeConfig = new DruidClusterBridgeConfig() { // from class: io.druid.server.bridge.DruidClusterBridgeTest.1
            public Duration getStartDelay() {
                return new Duration(0L);
            }

            public Duration getPeriod() {
                return new Duration(Long.MAX_VALUE);
            }
        };
        ScheduledExecutorFactory createFactory = ScheduledExecutors.createFactory(new Lifecycle());
        DruidNode druidNode = new DruidNode("me", "localhost", 8080);
        AtomicReference atomicReference = new AtomicReference(new LeaderLatch(build, "/test"));
        ZkPathsConfig zkPathsConfig = new ZkPathsConfig() { // from class: io.druid.server.bridge.DruidClusterBridgeTest.2
            public String getBase() {
                return "/druid";
            }
        };
        DruidServerMetadata druidServerMetadata = new DruidServerMetadata(CachingClusteredClientTest.DATA_SOURCE, "localhost", 1000L, "bridge", "_default_tier", 0);
        SegmentPublisher segmentPublisher = (SegmentPublisher) EasyMock.createMock(SegmentPublisher.class);
        EasyMock.replay(new Object[]{segmentPublisher});
        MetadataSegmentManager metadataSegmentManager = (MetadataSegmentManager) EasyMock.createMock(MetadataSegmentManager.class);
        EasyMock.replay(new Object[]{metadataSegmentManager});
        ServerView serverView = (ServerView) EasyMock.createMock(ServerView.class);
        EasyMock.replay(new Object[]{serverView});
        BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(defaultObjectMapper, zkPathsConfig, new SegmentLoaderConfig(), druidServerMetadata, build2, segmentPublisher, metadataSegmentManager, serverView);
        Announcer announcer = new Announcer(build2, Executors.newSingleThreadExecutor());
        announcer.start();
        announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + druidNode.getHostAndPort(), defaultObjectMapper.writeValueAsBytes(druidNode));
        BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = (BatchDataSegmentAnnouncer) EasyMock.createMock(BatchDataSegmentAnnouncer.class);
        BatchServerInventoryView batchServerInventoryView = (BatchServerInventoryView) EasyMock.createMock(BatchServerInventoryView.class);
        EasyMock.expect(batchServerInventoryView.getInventory()).andReturn(Arrays.asList(new DruidServer("1", "localhost", 117L, "historical", "_default_tier", 0), new DruidServer("2", "localhost", 1L, "historical", "_default_tier", 0)));
        batchServerInventoryView.registerSegmentCallback((Executor) EasyMock.anyObject(), (ServerView.SegmentCallback) EasyMock.anyObject());
        batchServerInventoryView.registerServerCallback((Executor) EasyMock.anyObject(), (ServerView.ServerCallback) EasyMock.anyObject());
        EasyMock.expectLastCall();
        batchServerInventoryView.start();
        EasyMock.expectLastCall();
        batchServerInventoryView.stop();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{batchServerInventoryView});
        DruidClusterBridge druidClusterBridge = new DruidClusterBridge(defaultObjectMapper, druidClusterBridgeConfig, zkPathsConfig, druidServerMetadata, createFactory, druidNode, build, atomicReference, bridgeZkCoordinator, announcer, batchDataSegmentAnnouncer, batchServerInventoryView);
        druidClusterBridge.start();
        int i = 0;
        while (!druidClusterBridge.isLeader()) {
            if (i > 100) {
                throw new ISE("Unable to become leader", new Object[0]);
            }
            Thread.sleep(200L);
            i++;
        }
        int i2 = 0;
        while (build2.checkExists().forPath("/druid/announcements/localhost:8080") == null) {
            if (i2 > 100) {
                throw new ISE("Unable to announce", new Object[0]);
            }
            Thread.sleep(200L);
            i2++;
        }
        boolean verifyUpdate = verifyUpdate(defaultObjectMapper, "/druid/announcements/localhost:8080", build2);
        int i3 = 0;
        while (!verifyUpdate) {
            if (i3 > 100) {
                throw new ISE("No updates to bridge node occurred", new Object[0]);
            }
            Thread.sleep(200L);
            i3++;
            verifyUpdate = verifyUpdate(defaultObjectMapper, "/druid/announcements/localhost:8080", build2);
        }
        announcer.stop();
        druidClusterBridge.stop();
        build2.close();
        testingCluster2.close();
        build.close();
        testingCluster.close();
        EasyMock.verify(new Object[]{batchServerInventoryView});
        EasyMock.verify(new Object[]{segmentPublisher});
        EasyMock.verify(new Object[]{metadataSegmentManager});
        EasyMock.verify(new Object[]{serverView});
    }

    private boolean verifyUpdate(ObjectMapper objectMapper, String str, CuratorFramework curatorFramework) throws Exception {
        return 118 == ((DruidServerMetadata) objectMapper.readValue((byte[]) curatorFramework.getData().forPath(str), DruidServerMetadata.class)).getMaxSize();
    }
}
