package io.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.class */
public class StreamAppenderatorDriverTest {
    private static final String DATA_SOURCE = "foo";
    private static final String VERSION = "abc123";
    private static final int MAX_ROWS_IN_MEMORY = 100;
    private static final int MAX_ROWS_PER_SEGMENT = 3;
    private static final long PUBLISH_TIMEOUT = 10000;
    private static final long HANDOFF_CONDITION_TIMEOUT = 1000;
    private SegmentAllocator allocator;
    private AppenderatorTester appenderatorTester;
    private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
    private StreamAppenderatorDriver driver;
    private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
    private static final List<InputRow> ROWS = Arrays.asList(new MapBasedInputRow(DateTimes.of("2000"), ImmutableList.of("dim1"), ImmutableMap.of("dim1", "foo", "met1", "1")), new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim1"), ImmutableMap.of("dim1", "foo", "met1", Double.valueOf(2.0d))), new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim2"), ImmutableMap.of("dim2", "bar", "met1", Double.valueOf(2.0d))));

    /* loaded from: input_file:io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestCommitterSupplier.class */
    static class TestCommitterSupplier<T> implements Supplier<Committer> {
        private final AtomicReference<T> metadata = new AtomicReference<>();

        public void setMetadata(T t) {
            this.metadata.set(t);
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Committer m64get() {
            final T t = this.metadata.get();
            return new Committer() { // from class: io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier.1
                public Object getMetadata() {
                    return t;
                }

                public void run() {
                }
            };
        }
    }

    /* loaded from: input_file:io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestSegmentAllocator.class */
    static class TestSegmentAllocator implements SegmentAllocator {
        private final String dataSource;
        private final Granularity granularity;
        private final Map<Long, AtomicInteger> counters = Maps.newHashMap();

        public TestSegmentAllocator(String str, Granularity granularity) {
            this.dataSource = str;
            this.granularity = granularity;
        }

        public SegmentIdentifier allocate(InputRow inputRow, String str, String str2, boolean z) throws IOException {
            SegmentIdentifier segmentIdentifier;
            synchronized (this.counters) {
                DateTime bucketStart = this.granularity.bucketStart(inputRow.getTimestamp());
                long millis = bucketStart.getMillis();
                if (!this.counters.containsKey(Long.valueOf(millis))) {
                    this.counters.put(Long.valueOf(millis), new AtomicInteger());
                }
                segmentIdentifier = new SegmentIdentifier(this.dataSource, this.granularity.bucket(bucketStart), StreamAppenderatorDriverTest.VERSION, new NumberedShardSpec(this.counters.get(Long.valueOf(millis)).getAndIncrement(), 0));
            }
            return segmentIdentifier;
        }
    }

    /* loaded from: input_file:io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestSegmentHandoffNotifierFactory.class */
    static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory {
        private boolean handoffEnabled = true;
        private long handoffDelay;

        public void disableHandoff() {
            this.handoffEnabled = false;
        }

        public void setHandoffDelay(long j) {
            this.handoffDelay = j;
        }

        public SegmentHandoffNotifier createSegmentHandoffNotifier(String str) {
            return new SegmentHandoffNotifier() { // from class: io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory.1
                public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
                    if (!TestSegmentHandoffNotifierFactory.this.handoffEnabled) {
                        return true;
                    }
                    if (TestSegmentHandoffNotifierFactory.this.handoffDelay > 0) {
                        try {
                            Thread.sleep(TestSegmentHandoffNotifierFactory.this.handoffDelay);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    executor.execute(runnable);
                    return true;
                }

                public void start() {
                }

                public void close() {
                }
            };
        }
    }

    @Before
    public void setUp() {
        this.appenderatorTester = new AppenderatorTester(100);
        this.allocator = new TestSegmentAllocator("foo", Granularities.HOUR);
        this.segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
        this.driver = new StreamAppenderatorDriver(this.appenderatorTester.getAppenderator(), this.allocator, this.segmentHandoffNotifierFactory, new TestUsedSegmentChecker(this.appenderatorTester), OBJECT_MAPPER, new FireDepartmentMetrics());
    }

    @After
    public void tearDown() throws Exception {
        this.driver.clear();
        this.driver.close();
    }

    @Test(timeout = 2000)
    public void testSimple() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob());
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
        }
        SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) this.driver.publish(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        SegmentsAndMetadata segmentsAndMetadata2 = (SegmentsAndMetadata) this.driver.registerHandoff(segmentsAndMetadata).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdentifier("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)), new SegmentIdentifier("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndMetadata2.getSegments()));
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndMetadata2.getCommitMetadata());
    }

    @Test
    public void testMaxRowsPerSegment() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob());
        for (int i = 0; i < 9; i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            AppenderatorDriverAddResult add = this.driver.add(new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim2"), ImmutableMap.of("dim2", StringUtils.format("bar-%d", new Object[]{Integer.valueOf(i)}), "met1", Double.valueOf(2.0d))), "dummy", testCommitterSupplier, false, true);
            Assert.assertTrue(add.isOk());
            if (add.getNumRowsInSegment() > MAX_ROWS_PER_SEGMENT) {
                this.driver.moveSegmentOut("dummy", ImmutableList.of(add.getSegmentIdentifier()));
            }
        }
        SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) this.driver.publish(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        SegmentsAndMetadata segmentsAndMetadata2 = (SegmentsAndMetadata) this.driver.registerHandoff(segmentsAndMetadata).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
        Assert.assertEquals(3L, segmentsAndMetadata2.getSegments().size());
        Assert.assertEquals(9, segmentsAndMetadata2.getCommitMetadata());
    }

    @Test(timeout = 5000, expected = TimeoutException.class)
    public void testHandoffTimeout() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        this.segmentHandoffNotifierFactory.disableHandoff();
        Assert.assertNull(this.driver.startJob());
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
        }
        SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) this.driver.publish(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey("dummy")) {
            Thread.sleep(100L);
        }
        this.driver.registerHandoff(segmentsAndMetadata).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob());
        testCommitterSupplier.setMetadata(1);
        Assert.assertTrue(this.driver.add(ROWS.get(0), "dummy", testCommitterSupplier, false, true).isOk());
        SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdentifier("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndMetadata.getSegments()));
        Assert.assertEquals(1, segmentsAndMetadata.getCommitMetadata());
        for (int i = 1; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "dummy", testCommitterSupplier, false, true).isOk());
            SegmentsAndMetadata segmentsAndMetadata2 = (SegmentsAndMetadata) this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
            Assert.assertEquals(ImmutableSet.of(new SegmentIdentifier("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(i - 1, 0))), asIdentifiers(segmentsAndMetadata2.getSegments()));
            Assert.assertEquals(Integer.valueOf(i + 1), segmentsAndMetadata2.getCommitMetadata());
        }
        this.driver.persist(testCommitterSupplier.m64get());
        SegmentsAndMetadata segmentsAndMetadata3 = (SegmentsAndMetadata) this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("dummy")).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(), asIdentifiers(segmentsAndMetadata3.getSegments()));
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndMetadata3.getCommitMetadata());
    }

    @Test
    public void testIncrementalHandoff() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob());
        testCommitterSupplier.setMetadata(1);
        Assert.assertTrue(this.driver.add(ROWS.get(0), "sequence_0", testCommitterSupplier, false, true).isOk());
        for (int i = 1; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "sequence_1", testCommitterSupplier, false, true).isOk());
        }
        ListenableFuture publishAndRegisterHandoff = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("sequence_0"));
        ListenableFuture publishAndRegisterHandoff2 = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.m64get(), ImmutableList.of("sequence_1"));
        SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata) publishAndRegisterHandoff.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
        SegmentsAndMetadata segmentsAndMetadata2 = (SegmentsAndMetadata) publishAndRegisterHandoff2.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdentifier("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndMetadata.getSegments()));
        Assert.assertEquals(ImmutableSet.of(new SegmentIdentifier("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndMetadata2.getSegments()));
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndMetadata.getCommitMetadata());
        Assert.assertEquals(Integer.valueOf(MAX_ROWS_PER_SEGMENT), segmentsAndMetadata2.getCommitMetadata());
    }

    private Set<SegmentIdentifier> asIdentifiers(Iterable<DataSegment> iterable) {
        return ImmutableSet.copyOf(Iterables.transform(iterable, new Function<DataSegment, SegmentIdentifier>() { // from class: io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.1
            public SegmentIdentifier apply(DataSegment dataSegment) {
                return SegmentIdentifier.fromDataSegment(dataSegment);
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TransactionalSegmentPublisher makeOkPublisher() {
        return new TransactionalSegmentPublisher() { // from class: io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.2
            public boolean publishSegments(Set<DataSegment> set, Object obj) throws IOException {
                return true;
            }
        };
    }
}
