package io.druid.segment.realtime.plumber;

import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.CachingClusteredClientTest;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.class */
public class RealtimePlumberSchoolTest {
    private final RejectionPolicyFactory rejectionPolicy;
    private RealtimePlumber plumber;
    private DataSegmentAnnouncer announcer;
    private SegmentPublisher segmentPublisher;
    private DataSegmentPusher dataSegmentPusher;
    private FilteredServerView serverView;
    private ServiceEmitter emitter;
    private RealtimeTuningConfig tuningConfig;
    private DataSchema schema;
    private FireDepartmentMetrics metrics;

    public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicyFactory) {
        this.rejectionPolicy = rejectionPolicyFactory;
    }

    @Parameterized.Parameters
    public static Collection<?> constructorFeeder() throws IOException {
        return Arrays.asList(new Object[]{new NoopRejectionPolicyFactory()}, new Object[]{new MessageTimeRejectionPolicyFactory()});
    }

    @Before
    public void setUp() throws Exception {
        Files.createTempDir().deleteOnExit();
        this.schema = new DataSchema(CachingClusteredClientTest.DATA_SOURCE, new InputRowParser() { // from class: io.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.1
            public InputRow parse(Object obj) {
                return null;
            }

            public ParseSpec getParseSpec() {
                return new JSONParseSpec(new TimestampSpec("timestamp", "auto"), new DimensionsSpec((List) null, (List) null, (List) null));
            }

            public InputRowParser withParseSpec(ParseSpec parseSpec) {
                return null;
            }
        }, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, (List) null));
        this.announcer = (DataSegmentAnnouncer) EasyMock.createMock(DataSegmentAnnouncer.class);
        this.announcer.announceSegment((DataSegment) EasyMock.anyObject());
        EasyMock.expectLastCall().anyTimes();
        this.segmentPublisher = (SegmentPublisher) EasyMock.createNiceMock(SegmentPublisher.class);
        this.dataSegmentPusher = (DataSegmentPusher) EasyMock.createNiceMock(DataSegmentPusher.class);
        this.serverView = (FilteredServerView) EasyMock.createMock(FilteredServerView.class);
        this.serverView.registerSegmentCallback((Executor) EasyMock.anyObject(), (ServerView.SegmentCallback) EasyMock.anyObject(), (Predicate) EasyMock.anyObject());
        EasyMock.expectLastCall().anyTimes();
        this.emitter = (ServiceEmitter) EasyMock.createMock(ServiceEmitter.class);
        EasyMock.replay(new Object[]{this.announcer, this.segmentPublisher, this.dataSegmentPusher, this.serverView, this.emitter});
        this.tuningConfig = new RealtimeTuningConfig(1, (Period) null, (Period) null, (File) null, new IntervalStartVersioningPolicy(), this.rejectionPolicy, (Integer) null, (ShardSpec) null, (Boolean) null, (Boolean) null, (Integer) null);
        RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(this.emitter, new DefaultQueryRunnerFactoryConglomerate(Maps.newHashMap()), this.dataSegmentPusher, this.announcer, this.segmentPublisher, this.serverView, MoreExecutors.sameThreadExecutor());
        this.metrics = new FireDepartmentMetrics();
        this.plumber = realtimePlumberSchool.findPlumber(this.schema, this.tuningConfig, this.metrics);
    }

    @After
    public void tearDown() throws Exception {
        EasyMock.verify(new Object[]{this.announcer, this.segmentPublisher, this.dataSegmentPusher, this.serverView, this.emitter});
    }

    @Test(timeout = 60000)
    public void testPersist() throws Exception {
        final MutableBoolean mutableBoolean = new MutableBoolean(false);
        this.plumber.getSinks().put(0L, new Sink(new Interval(0L, TimeUnit.HOURS.toMillis(1L)), this.schema, this.tuningConfig, new DateTime("2014-12-01T12:34:56.789").toString()));
        this.plumber.startJob();
        InputRow inputRow = (InputRow) EasyMock.createNiceMock(InputRow.class);
        EasyMock.expect(Long.valueOf(inputRow.getTimestampFromEpoch())).andReturn(0L);
        EasyMock.expect(inputRow.getDimensions()).andReturn(new ArrayList());
        EasyMock.replay(new Object[]{inputRow});
        this.plumber.add(inputRow);
        this.plumber.persist(new Runnable() { // from class: io.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.2
            @Override // java.lang.Runnable
            public void run() {
                mutableBoolean.setValue(true);
            }
        });
        while (!mutableBoolean.booleanValue()) {
            Thread.sleep(100L);
        }
        this.plumber.getSinks().clear();
        this.plumber.finishJob();
    }

    @Test(timeout = 60000)
    public void testPersistFails() throws Exception {
        final MutableBoolean mutableBoolean = new MutableBoolean(false);
        this.plumber.getSinks().put(0L, new Sink(new Interval(0L, TimeUnit.HOURS.toMillis(1L)), this.schema, this.tuningConfig, new DateTime("2014-12-01T12:34:56.789").toString()));
        this.plumber.startJob();
        InputRow inputRow = (InputRow) EasyMock.createNiceMock(InputRow.class);
        EasyMock.expect(Long.valueOf(inputRow.getTimestampFromEpoch())).andReturn(0L);
        EasyMock.expect(inputRow.getDimensions()).andReturn(new ArrayList());
        EasyMock.replay(new Object[]{inputRow});
        this.plumber.add(inputRow);
        this.plumber.persist(new Runnable() { // from class: io.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.3
            @Override // java.lang.Runnable
            public void run() {
                mutableBoolean.setValue(true);
                throw new RuntimeException();
            }
        });
        while (!mutableBoolean.booleanValue()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(1L, this.metrics.failedPersists());
    }
}
