package io.druid.segment.realtime;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import io.druid.client.CachingClusteredClientTest;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.timeline.partition.ShardSpec;
import io.druid.utils.Runnables;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest.class */
public class RealtimeManagerTest {
    private RealtimeManager realtimeManager;
    private DataSchema schema;
    private TestPlumber plumber;

    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$TestFirehose.class */
    private static class TestFirehose implements Firehose {
        private final Iterator<InputRow> rows;

        private TestFirehose(Iterator<InputRow> it) {
            this.rows = it;
        }

        public boolean hasMore() {
            return this.rows.hasNext();
        }

        public InputRow nextRow() {
            return this.rows.next();
        }

        public Runnable commit() {
            return Runnables.getNoopRunnable();
        }

        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:io/druid/segment/realtime/RealtimeManagerTest$TestPlumber.class */
    private static class TestPlumber implements Plumber {
        private final Sink sink;
        private volatile boolean startedJob;
        private volatile boolean finishedJob;
        private volatile int persistCount;

        private TestPlumber(Sink sink) {
            this.startedJob = false;
            this.finishedJob = false;
            this.persistCount = 0;
            this.sink = sink;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isStartedJob() {
            return this.startedJob;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFinishedJob() {
            return this.finishedJob;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getPersistCount() {
            return this.persistCount;
        }

        public void startJob() {
            this.startedJob = true;
        }

        public int add(InputRow inputRow) throws IndexSizeExceededException {
            Sink sink;
            if (inputRow == null || (sink = getSink(inputRow.getTimestampFromEpoch())) == null) {
                return -1;
            }
            return sink.add(inputRow);
        }

        public Sink getSink(long j) {
            if (this.sink.getInterval().contains(j)) {
                return this.sink;
            }
            return null;
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            throw new UnsupportedOperationException();
        }

        public void persist(Runnable runnable) {
            this.persistCount++;
        }

        public void finishJob() {
            this.finishedJob = true;
        }
    }

    @Before
    public void setUp() throws Exception {
        final List asList = Arrays.asList(makeRow(new DateTime("9000-01-01").getMillis()), makeRow(new DateTime().getMillis()));
        this.schema = new DataSchema(CachingClusteredClientTest.DATA_SOURCE, (InputRowParser) null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, (List) null));
        RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(new FirehoseFactory() { // from class: io.druid.segment.realtime.RealtimeManagerTest.1
            public Firehose connect(InputRowParser inputRowParser) throws IOException {
                return new TestFirehose(asList.iterator());
            }
        }, new PlumberSchool() { // from class: io.druid.segment.realtime.RealtimeManagerTest.2
            public Plumber findPlumber(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics) {
                return RealtimeManagerTest.this.plumber;
            }
        });
        RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(1, new Period("P1Y"), (Period) null, (File) null, (VersioningPolicy) null, (RejectionPolicyFactory) null, (Integer) null, (ShardSpec) null, (Boolean) null, (Boolean) null, (Integer) null);
        this.plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), this.schema, realtimeTuningConfig, new DateTime().toString()));
        this.realtimeManager = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema, realtimeIOConfig, realtimeTuningConfig)), (QueryRunnerFactoryConglomerate) null, (ExecutorService) null);
    }

    @Test
    public void testRun() throws Exception {
        this.realtimeManager.start();
        Stopwatch createStarted = Stopwatch.createStarted();
        while (this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).processed() != 1) {
            Thread.sleep(100L);
            if (createStarted.elapsed(TimeUnit.MILLISECONDS) > 1000) {
                throw new ISE("Realtime manager should have completed processing 2 events!", new Object[0]);
            }
        }
        Assert.assertEquals(1L, this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).processed());
        Assert.assertEquals(1L, this.realtimeManager.getMetrics(CachingClusteredClientTest.DATA_SOURCE).thrownAway());
        Assert.assertTrue(this.plumber.isStartedJob());
        Assert.assertTrue(this.plumber.isFinishedJob());
        Assert.assertEquals(1, this.plumber.getPersistCount());
    }

    private InputRow makeRow(final long j) {
        return new InputRow() { // from class: io.druid.segment.realtime.RealtimeManagerTest.3
            public List<String> getDimensions() {
                return Arrays.asList("testDim");
            }

            public long getTimestampFromEpoch() {
                return j;
            }

            public DateTime getTimestamp() {
                return new DateTime(j);
            }

            public List<String> getDimension(String str) {
                return Lists.newArrayList();
            }

            public float getFloatMetric(String str) {
                return 0.0f;
            }

            public long getLongMetric(String str) {
                return 0L;
            }

            public Object getRaw(String str) {
                return null;
            }

            public int compareTo(Row row) {
                return 0;
            }
        };
    }
}
