package io.druid.segment.realtime;

import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.client.CachingClusteredClientTest;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.BaseQuery;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexSpec;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.utils.Runnables;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest.class */
public class RealtimeManagerTest {
    private static QueryRunnerFactory factory;
    private static QueryRunnerFactoryConglomerate conglomerate;
    private static final List<TestInputRowHolder> rows = Arrays.asList(makeRow(DateTimes.of("9000-01-01").getMillis()), makeRow((RuntimeException) new ParseException("parse error", new Object[0])), null, makeRow(System.currentTimeMillis()));
    private RealtimeManager realtimeManager;
    private RealtimeManager realtimeManager2;
    private RealtimeManager realtimeManager3;
    private DataSchema schema;
    private DataSchema schema2;
    private TestPlumber plumber;
    private TestPlumber plumber2;
    private RealtimeTuningConfig tuningConfig_0;
    private RealtimeTuningConfig tuningConfig_1;
    private DataSchema schema3;

    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$SleepingFirehose.class */
    private static class SleepingFirehose implements Firehose {
        private boolean closed;

        private SleepingFirehose() {
        }

        public boolean hasMore() {
            try {
                Thread.sleep(1000L);
                return true;
            } catch (InterruptedException e) {
                throw Throwables.propagate(e);
            }
        }

        @Nullable
        public InputRow nextRow() {
            return null;
        }

        public Runnable commit() {
            return null;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$TestFirehose.class */
    private static class TestFirehose implements Firehose {
        private final Iterator<TestInputRowHolder> rows;
        private boolean closed;

        private TestFirehose(Iterator<TestInputRowHolder> it) {
            this.rows = it;
        }

        public boolean hasMore() {
            return this.rows.hasNext();
        }

        @Nullable
        public InputRow nextRow() {
            TestInputRowHolder next = this.rows.next();
            if (next == null) {
                return null;
            }
            return next.getRow();
        }

        public Runnable commit() {
            return Runnables.getNoopRunnable();
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$TestFirehoseV2.class */
    private static class TestFirehoseV2 implements FirehoseV2 {
        private final Iterator<TestInputRowHolder> rows;
        private InputRow currRow;
        private boolean stop;
        private boolean closed;

        private TestFirehoseV2(Iterator<TestInputRowHolder> it) {
            this.rows = it;
        }

        private void nextMessage() {
            this.currRow = null;
            while (this.currRow == null) {
                TestInputRowHolder next = this.rows.next();
                this.currRow = next == null ? null : next.getRow();
            }
        }

        public void close() throws IOException {
            this.closed = true;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public boolean advance() {
            this.stop = !this.rows.hasNext();
            if (this.stop) {
                return false;
            }
            nextMessage();
            return true;
        }

        public InputRow currRow() {
            return this.currRow;
        }

        public Committer makeCommitter() {
            return new Committer() { // from class: io.druid.segment.realtime.RealtimeManagerTest.TestFirehoseV2.1
                public Object getMetadata() {
                    return null;
                }

                public void run() {
                }
            };
        }

        public void start() throws Exception {
            nextMessage();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$TestInputRowHolder.class */
    public static class TestInputRowHolder {
        private long timestamp;
        private RuntimeException exception;

        public TestInputRowHolder(long j, RuntimeException runtimeException) {
            this.timestamp = j;
            this.exception = runtimeException;
        }

        public InputRow getRow() {
            if (this.exception != null) {
                throw this.exception;
            }
            return new InputRow() { // from class: io.druid.segment.realtime.RealtimeManagerTest.TestInputRowHolder.1
                public List<String> getDimensions() {
                    return Arrays.asList("testDim");
                }

                public long getTimestampFromEpoch() {
                    return TestInputRowHolder.this.timestamp;
                }

                public DateTime getTimestamp() {
                    return DateTimes.utc(TestInputRowHolder.this.timestamp);
                }

                public List<String> getDimension(String str) {
                    return Lists.newArrayList();
                }

                public Number getMetric(String str) {
                    return 0;
                }

                public Object getRaw(String str) {
                    return null;
                }

                public int compareTo(Row row) {
                    return 0;
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$TestPlumber.class */
    public static class TestPlumber implements Plumber {
        private final Sink sink;
        private volatile boolean startedJob;
        private volatile boolean finishedJob;
        private volatile int persistCount;
        private Map<Interval, QueryRunner> runners;

        private TestPlumber(Sink sink) {
            this.startedJob = false;
            this.finishedJob = false;
            this.persistCount = 0;
            this.sink = sink;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isStartedJob() {
            return this.startedJob;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFinishedJob() {
            return this.finishedJob;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getPersistCount() {
            return this.persistCount;
        }

        public Object startJob() {
            this.startedJob = true;
            return null;
        }

        public int add(InputRow inputRow, Supplier<Committer> supplier) throws IndexSizeExceededException {
            Sink sink;
            if (inputRow == null || (sink = getSink(inputRow.getTimestampFromEpoch())) == null) {
                return -1;
            }
            return sink.add(inputRow, false);
        }

        public Sink getSink(long j) {
            if (this.sink.getInterval().contains(j)) {
                return this.sink;
            }
            return null;
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            if (this.runners == null) {
                throw new UnsupportedOperationException();
            }
            BaseQuery baseQuery = (BaseQuery) query;
            if (baseQuery.getQuerySegmentSpec() instanceof MultipleIntervalSegmentSpec) {
                return RealtimeManagerTest.factory.getToolchest().mergeResults(RealtimeManagerTest.factory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(baseQuery.getIntervals(), new Function<Interval, QueryRunner<T>>() { // from class: io.druid.segment.realtime.RealtimeManagerTest.TestPlumber.1
                    public QueryRunner<T> apply(Interval interval) {
                        return (QueryRunner) TestPlumber.this.runners.get(interval);
                    }
                })));
            }
            Assert.assertEquals(1L, query.getIntervals().size());
            SegmentDescriptor descriptor = ((BaseQuery) query).getQuerySegmentSpec().getDescriptor();
            return new SpecificSegmentQueryRunner(this.runners.get(descriptor.getInterval()), new SpecificSegmentSpec(descriptor));
        }

        public void persist(Committer committer) {
            this.persistCount++;
        }

        public void finishJob() {
            this.finishedJob = true;
        }

        public void setRunners(Map<Interval, QueryRunner> map) {
            this.runners = map;
        }
    }

    @BeforeClass
    public static void setupStatic() {
        factory = initFactory();
        conglomerate = new QueryRunnerFactoryConglomerate() { // from class: io.druid.segment.realtime.RealtimeManagerTest.1
            public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType querytype) {
                return RealtimeManagerTest.factory;
            }
        };
    }

    @Before
    public void setUp() throws Exception {
        DefaultObjectMapper defaultObjectMapper = new DefaultObjectMapper();
        this.schema = new DataSchema(CachingClusteredClientTest.DATA_SOURCE, (Map) null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, (List) null), (TransformSpec) null, defaultObjectMapper);
        this.schema2 = new DataSchema("testV2", (Map) null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, (List) null), (TransformSpec) null, defaultObjectMapper);
        RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(new FirehoseFactory() { // from class: io.druid.segment.realtime.RealtimeManagerTest.2
            public Firehose connect(InputRowParser inputRowParser, File file) throws IOException {
                return new TestFirehose(RealtimeManagerTest.rows.iterator());
            }
        }, new PlumberSchool() { // from class: io.druid.segment.realtime.RealtimeManagerTest.3
            public Plumber findPlumber(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics) {
                return RealtimeManagerTest.this.plumber;
            }
        }, (FirehoseFactoryV2) null);
        RealtimeIOConfig realtimeIOConfig2 = new RealtimeIOConfig((FirehoseFactory) null, new PlumberSchool() { // from class: io.druid.segment.realtime.RealtimeManagerTest.4
            public Plumber findPlumber(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics) {
                return RealtimeManagerTest.this.plumber2;
            }
        }, new FirehoseFactoryV2() { // from class: io.druid.segment.realtime.RealtimeManagerTest.5
            public FirehoseV2 connect(InputRowParser inputRowParser, Object obj) throws IOException, ParseException {
                return new TestFirehoseV2(RealtimeManagerTest.rows.iterator());
            }
        });
        RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(1, new Period("P1Y"), (Period) null, (File) null, (VersioningPolicy) null, (RejectionPolicyFactory) null, (Integer) null, (ShardSpec) null, (IndexSpec) null, (Boolean) null, 0, 0, (Boolean) null, (Long) null, (Long) null, (SegmentWriteOutMediumFactory) null);
        this.plumber = new TestPlumber(new Sink(Intervals.of("0/P5000Y"), this.schema, realtimeTuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), realtimeTuningConfig.getMaxRowsInMemory(), realtimeTuningConfig.isReportParseExceptions()));
        this.realtimeManager = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema, realtimeIOConfig, realtimeTuningConfig)), (QueryRunnerFactoryConglomerate) null, (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
        this.plumber2 = new TestPlumber(new Sink(Intervals.of("0/P5000Y"), this.schema2, realtimeTuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), realtimeTuningConfig.getMaxRowsInMemory(), realtimeTuningConfig.isReportParseExceptions()));
        this.realtimeManager2 = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema2, realtimeIOConfig2, realtimeTuningConfig)), (QueryRunnerFactoryConglomerate) null, (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
        this.tuningConfig_0 = new RealtimeTuningConfig(1, new Period("P1Y"), (Period) null, (File) null, (VersioningPolicy) null, (RejectionPolicyFactory) null, (Integer) null, new LinearShardSpec(0), (IndexSpec) null, (Boolean) null, 0, 0, (Boolean) null, (Long) null, (Long) null, (SegmentWriteOutMediumFactory) null);
        this.tuningConfig_1 = new RealtimeTuningConfig(1, new Period("P1Y"), (Period) null, (File) null, (VersioningPolicy) null, (RejectionPolicyFactory) null, (Integer) null, new LinearShardSpec(1), (IndexSpec) null, (Boolean) null, 0, 0, (Boolean) null, (Long) null, (Long) null, (SegmentWriteOutMediumFactory) null);
        this.schema3 = new DataSchema("testing", (Map) null, new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, (List) null), (TransformSpec) null, defaultObjectMapper);
        this.realtimeManager3 = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema3, realtimeIOConfig, this.tuningConfig_0), new FireDepartment(this.schema3, realtimeIOConfig2, this.tuningConfig_1)), conglomerate, (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), (Map) null);
    }

    @After
    public void tearDown() throws Exception {
        this.realtimeManager.stop();
        this.realtimeManager2.stop();
        this.realtimeManager3.stop();
    }

    @Test
    public void testRun() throws Exception {
        this.realtimeManager.start();
        Stopwatch createStarted = Stopwatch.createStarted();
        while (this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).processed() != 1) {
            Thread.sleep(100L);
            if (createStarted.elapsed(TimeUnit.MILLISECONDS) > 1000) {
                throw new ISE("Realtime manager should have completed processing 2 events!", new Object[0]);
            }
        }
        Assert.assertEquals(1L, this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).processed());
        Assert.assertEquals(2L, this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).thrownAway());
        Assert.assertEquals(1L, this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).unparseable());
        Assert.assertTrue(this.plumber.isStartedJob());
        Assert.assertTrue(this.plumber.isFinishedJob());
        Assert.assertEquals(0L, this.plumber.getPersistCount());
    }

    @Test
    public void testRunV2() throws Exception {
        this.realtimeManager2.start();
        Stopwatch createStarted = Stopwatch.createStarted();
        while (this.realtimeManager2.getMetrics("testV2").processed() != 1) {
            Thread.sleep(100L);
            if (createStarted.elapsed(TimeUnit.MILLISECONDS) > 1000) {
                throw new ISE("Realtime manager should have completed processing 2 events!", new Object[0]);
            }
        }
        Assert.assertEquals(1L, this.realtimeManager2.getMetrics("testV2").processed());
        Assert.assertEquals(1L, this.realtimeManager2.getMetrics("testV2").thrownAway());
        Assert.assertEquals(2L, this.realtimeManager2.getMetrics("testV2").unparseable());
        Assert.assertTrue(this.plumber2.isStartedJob());
        Assert.assertTrue(this.plumber2.isFinishedJob());
        Assert.assertEquals(0L, this.plumber2.getPersistCount());
    }

    @Test(timeout = 5000)
    public void testNormalStop() throws IOException, InterruptedException {
        final TestFirehose testFirehose = new TestFirehose(rows.iterator());
        TestFirehoseV2 testFirehoseV2 = new TestFirehoseV2(rows.iterator());
        RealtimeManager realtimeManager = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema3, new RealtimeIOConfig(new FirehoseFactory() { // from class: io.druid.segment.realtime.RealtimeManagerTest.6
            public Firehose connect(InputRowParser inputRowParser, File file) throws IOException {
                return testFirehose;
            }
        }, (dataSchema, realtimeTuningConfig, fireDepartmentMetrics) -> {
            return this.plumber;
        }, (FirehoseFactoryV2) null), this.tuningConfig_0), new FireDepartment(this.schema3, new RealtimeIOConfig((FirehoseFactory) null, (dataSchema2, realtimeTuningConfig2, fireDepartmentMetrics2) -> {
            return this.plumber2;
        }, (inputRowParser, obj) -> {
            return testFirehoseV2;
        }), this.tuningConfig_1)), conglomerate, (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), (Map) null);
        realtimeManager.start();
        while (realtimeManager.getMetrics("testing").processed() < 2) {
            Thread.sleep(100L);
        }
        realtimeManager.stop();
        Assert.assertTrue(testFirehose.isClosed());
        Assert.assertTrue(testFirehoseV2.isClosed());
        Assert.assertTrue(this.plumber.isFinishedJob());
        Assert.assertTrue(this.plumber2.isFinishedJob());
    }

    @Test(timeout = 5000)
    public void testStopByInterruption() throws IOException {
        final SleepingFirehose sleepingFirehose = new SleepingFirehose();
        RealtimeManager realtimeManager = new RealtimeManager(Collections.singletonList(new FireDepartment(this.schema, new RealtimeIOConfig(new FirehoseFactory() { // from class: io.druid.segment.realtime.RealtimeManagerTest.7
            public Firehose connect(InputRowParser inputRowParser, File file) throws IOException {
                return sleepingFirehose;
            }
        }, (dataSchema, realtimeTuningConfig, fireDepartmentMetrics) -> {
            return this.plumber;
        }, (FirehoseFactoryV2) null), this.tuningConfig_0)), conglomerate, (DataSegmentServerAnnouncer) EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), (Map) null);
        realtimeManager.start();
        realtimeManager.stop();
        Assert.assertTrue(sleepingFirehose.isClosed());
        Assert.assertFalse(this.plumber.isFinishedJob());
    }

    @Test(timeout = 10000)
    public void testQueryWithInterval() throws IOException, InterruptedException {
        List asList = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "automotive", "rows", 2L, "idx", 270L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "business", "rows", 2L, "idx", 236L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "entertainment", "rows", 2L, "idx", 316L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "health", "rows", 2L, "idx", 240L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "mezzanine", "rows", 6L, "idx", 5740L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "news", "rows", 2L, "idx", 242L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "premium", "rows", 6L, "idx", 5800L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "technology", "rows", 2L, "idx", 156L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "travel", "rows", 2L, "idx", 238L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "automotive", "rows", 2L, "idx", 294L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "business", "rows", 2L, "idx", 224L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "entertainment", "rows", 2L, "idx", 332L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "health", "rows", 2L, "idx", 226L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "mezzanine", "rows", 6L, "idx", 4894L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "news", "rows", 2L, "idx", 228L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "premium", "rows", 6L, "idx", 5010L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "technology", "rows", 2L, "idx", 194L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "travel", "rows", 2L, "idx", 252L}));
        this.realtimeManager3.start();
        while (this.realtimeManager3.getFireChiefs("testing").values().stream().anyMatch(fireChief -> {
            Plumber plumber = fireChief.getPlumber();
            return plumber == null || !((TestPlumber) plumber).isStartedJob();
        })) {
            Thread.sleep(10L);
        }
        for (QueryRunner queryRunner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
            GroupByQuery build = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird).setDimensions(Lists.newArrayList(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")})).setAggregatorSpecs(Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))).setGranularity(QueryRunnerTestHelper.dayGran).build();
            this.plumber.setRunners(ImmutableMap.of(build.getIntervals().get(0), queryRunner));
            this.plumber2.setRunners(ImmutableMap.of(build.getIntervals().get(0), queryRunner));
            TestHelper.assertExpectedObjects(asList, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForIntervals(build, QueryRunnerTestHelper.firstToThird.getIntervals()), build), "");
        }
    }

    @Test(timeout = 10000)
    public void testQueryWithSegmentSpec() throws IOException, InterruptedException {
        List asList = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "automotive", "rows", 1L, "idx", 135L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "business", "rows", 1L, "idx", 118L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "entertainment", "rows", 1L, "idx", 158L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "health", "rows", 1L, "idx", 120L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "mezzanine", "rows", 3L, "idx", 2870L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "news", "rows", 1L, "idx", 121L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "premium", "rows", 3L, "idx", 2900L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "technology", "rows", 1L, "idx", 78L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", new Object[]{"alias", "travel", "rows", 1L, "idx", 119L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "automotive", "rows", 1L, "idx", 147L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "business", "rows", 1L, "idx", 112L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "entertainment", "rows", 1L, "idx", 166L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "health", "rows", 1L, "idx", 113L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "mezzanine", "rows", 3L, "idx", 2447L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "news", "rows", 1L, "idx", 114L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "premium", "rows", 3L, "idx", 2505L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "technology", "rows", 1L, "idx", 97L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", new Object[]{"alias", "travel", "rows", 1L, "idx", 126L}));
        this.realtimeManager3.start();
        while (this.realtimeManager3.getFireChiefs("testing").values().stream().anyMatch(fireChief -> {
            Plumber plumber = fireChief.getPlumber();
            return plumber == null || !((TestPlumber) plumber).isStartedJob();
        })) {
            Thread.sleep(10L);
        }
        for (QueryRunner queryRunner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
            GroupByQuery build = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird).setDimensions(Lists.newArrayList(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")})).setAggregatorSpecs(Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))).setGranularity(QueryRunnerTestHelper.dayGran).build();
            this.plumber.setRunners(ImmutableMap.of(build.getIntervals().get(0), queryRunner));
            this.plumber2.setRunners(ImmutableMap.of(build.getIntervals().get(0), queryRunner));
            TestHelper.assertExpectedObjects(asList, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments(build, ImmutableList.of(new SegmentDescriptor(Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), "ver", 0))), build), "");
            TestHelper.assertExpectedObjects(asList, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments(build, ImmutableList.of(new SegmentDescriptor(Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), "ver", 1))), build), "");
        }
    }

    @Test(timeout = 10000)
    public void testQueryWithMultipleSegmentSpec() throws IOException, InterruptedException {
        List asList = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "business", "rows", 2L, "idx", 260L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "health", "rows", 2L, "idx", 236L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "mezzanine", "rows", 4L, "idx", 4556L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "news", "rows", 2L, "idx", 284L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "technology", "rows", 2L, "idx", 202L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", new Object[]{"alias", "automotive", "rows", 2L, "idx", 288L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", new Object[]{"alias", "entertainment", "rows", 2L, "idx", 326L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "automotive", "rows", 2L, "idx", 312L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "business", "rows", 2L, "idx", 248L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "entertainment", "rows", 2L, "idx", 326L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "health", "rows", 2L, "idx", 262L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "mezzanine", "rows", 6L, "idx", 5126L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "news", "rows", 2L, "idx", 254L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "premium", "rows", 6L, "idx", 5276L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "technology", "rows", 2L, "idx", 206L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "travel", "rows", 2L, "idx", 260L}));
        List asList2 = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "business", "rows", 1L, "idx", 130L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "health", "rows", 1L, "idx", 118L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "mezzanine", "rows", 2L, "idx", 2278L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "news", "rows", 1L, "idx", 142L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", new Object[]{"alias", "technology", "rows", 1L, "idx", 101L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", new Object[]{"alias", "automotive", "rows", 1L, "idx", 144L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", new Object[]{"alias", "entertainment", "rows", 1L, "idx", 163L}));
        List asList3 = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "automotive", "rows", 1L, "idx", 156L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "business", "rows", 1L, "idx", 124L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "entertainment", "rows", 1L, "idx", 163L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "health", "rows", 1L, "idx", 131L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "mezzanine", "rows", 3L, "idx", 2563L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "news", "rows", 1L, "idx", 127L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "premium", "rows", 3L, "idx", 2638L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "technology", "rows", 1L, "idx", 103L}), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", new Object[]{"alias", "travel", "rows", 1L, "idx", 130L}));
        this.realtimeManager3.start();
        while (this.realtimeManager3.getFireChiefs("testing").values().stream().anyMatch(fireChief -> {
            Plumber plumber = fireChief.getPlumber();
            return plumber == null || !((TestPlumber) plumber).isStartedJob();
        })) {
            Thread.sleep(10L);
        }
        Interval of = Intervals.of("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z");
        Interval of2 = Intervals.of("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z");
        SegmentDescriptor segmentDescriptor = new SegmentDescriptor(of, "ver0", 0);
        SegmentDescriptor segmentDescriptor2 = new SegmentDescriptor(of2, "ver1", 0);
        SegmentDescriptor segmentDescriptor3 = new SegmentDescriptor(of, "ver0", 1);
        SegmentDescriptor segmentDescriptor4 = new SegmentDescriptor(of2, "ver1", 1);
        GroupByQuery build = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(new MultipleSpecificSegmentSpec(ImmutableList.of(segmentDescriptor, segmentDescriptor2, segmentDescriptor3, segmentDescriptor4))).setDimensions(Lists.newArrayList(new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")})).setAggregatorSpecs(Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))).setGranularity(QueryRunnerTestHelper.dayGran).build();
        Map<Interval, QueryRunner> of3 = ImmutableMap.of(of, QueryRunnerTestHelper.makeQueryRunner(factory, "druid.sample.numeric.tsv.top", (String) null), of2, QueryRunnerTestHelper.makeQueryRunner(factory, "druid.sample.numeric.tsv.bottom", (String) null));
        this.plumber.setRunners(of3);
        this.plumber2.setRunners(of3);
        TestHelper.assertExpectedObjects(asList, GroupByQueryRunnerTestHelper.runQuery(factory, build.getQuerySegmentSpec().lookup(build, this.realtimeManager3), build), "");
        TestHelper.assertExpectedObjects(asList2, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments(build, ImmutableList.of(segmentDescriptor)), build), "");
        TestHelper.assertExpectedObjects(asList3, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments(build, ImmutableList.of(segmentDescriptor2)), build), "");
        TestHelper.assertExpectedObjects(asList2, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments(build, ImmutableList.of(segmentDescriptor3)), build), "");
        TestHelper.assertExpectedObjects(asList3, GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments(build, ImmutableList.of(segmentDescriptor4)), build), "");
    }

    private static GroupByQueryRunnerFactory initFactory() {
        GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
        groupByQueryConfig.setMaxIntermediateRows(10000);
        return GroupByQueryRunnerTest.makeQueryRunnerFactory(groupByQueryConfig);
    }

    private static TestInputRowHolder makeRow(long j) {
        return new TestInputRowHolder(j, null);
    }

    private static TestInputRowHolder makeRow(RuntimeException runtimeException) {
        return new TestInputRowHolder(0L, runtimeException);
    }
}
