package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.joda.time.Interval;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.class */
public class PinotLLCRealtimeSegmentManagerTest {
    private static final String SCHEME = "file:";
    static final String SEGMENT_VERSION;
    static final int NUM_DOCS;
    private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "PinotLLCRealtimeSegmentManagerTest");
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
    private static final long RANDOM_SEED = System.currentTimeMillis();
    private static final Random RANDOM = new Random(RANDOM_SEED);
    static final LongMsgOffset PARTITION_OFFSET = new LongMsgOffset(RANDOM.nextInt(Integer.MAX_VALUE));
    static final long CURRENT_TIME_MS = System.currentTimeMillis();
    static final long START_TIME_MS = CURRENT_TIME_MS - TimeUnit.HOURS.toMillis(RANDOM.nextInt(24) + 24);
    static final long END_TIME_MS = START_TIME_MS + TimeUnit.HOURS.toMillis(RANDOM.nextInt(24) + 1);
    static final Interval INTERVAL = new Interval(START_TIME_MS, END_TIME_MS);
    static final String CRC = Long.toString(RANDOM.nextLong());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest$FakePinotLLCRealtimeSegmentManager.class */
    public static class FakePinotLLCRealtimeSegmentManager extends PinotLLCRealtimeSegmentManager {
        static final ControllerConf CONTROLLER_CONF = new ControllerConf();
        int _numReplicas;
        TableConfig _tableConfig;
        PartitionLevelStreamConfig _streamConfig;
        int _numInstances;
        InstancePartitions _consumingInstancePartitions;
        Map<String, LLCRealtimeSegmentZKMetadata> _segmentZKMetadataMap;
        Map<String, Integer> _segmentZKMetadataVersionMap;
        IdealState _idealState;
        int _numPartitions;
        boolean _exceededMaxSegmentCompletionTime;

        FakePinotLLCRealtimeSegmentManager() {
            super((PinotHelixResourceManager) Mockito.mock(PinotHelixResourceManager.class), CONTROLLER_CONF, (ControllerMetrics) Mockito.mock(ControllerMetrics.class));
            this._segmentZKMetadataMap = new HashMap();
            this._segmentZKMetadataVersionMap = new HashMap();
            this._exceededMaxSegmentCompletionTime = false;
        }

        void makeTableConfig() {
            this._tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(PinotLLCRealtimeSegmentManagerTest.RAW_TABLE_NAME).setNumReplicas(this._numReplicas).setLLC(true).setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
            this._streamConfig = new PartitionLevelStreamConfig(this._tableConfig.getTableName(), this._tableConfig.getIndexingConfig().getStreamConfigs());
        }

        void makeConsumingInstancePartitions() {
            ArrayList arrayList = new ArrayList(this._numInstances);
            for (int i = 0; i < this._numInstances; i++) {
                arrayList.add("Server_" + i);
            }
            this._consumingInstancePartitions = new InstancePartitions(InstancePartitionsType.CONSUMING.getInstancePartitionsName(PinotLLCRealtimeSegmentManagerTest.RAW_TABLE_NAME));
            this._consumingInstancePartitions.setInstances(0, 0, arrayList);
        }

        public void setUpNewTable() {
            setUpNewTable(this._tableConfig, new IdealState(PinotLLCRealtimeSegmentManagerTest.REALTIME_TABLE_NAME));
        }

        public void ensureAllPartitionsConsuming() {
            ensureAllPartitionsConsuming(this._tableConfig, this._streamConfig, this._idealState, this._numPartitions);
        }

        public TableConfig getTableConfig(String str) {
            return this._tableConfig;
        }

        InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
            return this._consumingInstancePartitions;
        }

        List<String> getAllSegments(String str) {
            return new ArrayList(this._segmentZKMetadataMap.keySet());
        }

        List<String> getLLCSegments(String str) {
            return new ArrayList(this._segmentZKMetadataMap.keySet());
        }

        LLCRealtimeSegmentZKMetadata getSegmentZKMetadata(String str, String str2, @Nullable Stat stat) {
            Preconditions.checkState(this._segmentZKMetadataMap.containsKey(str2));
            if (stat != null) {
                stat.setVersion(this._segmentZKMetadataVersionMap.get(str2).intValue());
            }
            return new LLCRealtimeSegmentZKMetadata(this._segmentZKMetadataMap.get(str2).toZNRecord());
        }

        void persistSegmentZKMetadata(String str, LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata, int i) {
            String segmentName = lLCRealtimeSegmentZKMetadata.getSegmentName();
            int intValue = this._segmentZKMetadataVersionMap.getOrDefault(segmentName, -1).intValue();
            if (i != -1) {
                Preconditions.checkState(i == intValue);
            }
            this._segmentZKMetadataMap.put(segmentName, lLCRealtimeSegmentZKMetadata);
            this._segmentZKMetadataVersionMap.put(segmentName, Integer.valueOf(intValue + 1));
        }

        protected IdealState getIdealState(String str) {
            return this._idealState;
        }

        protected void setIdealState(String str, IdealState idealState) {
            this._idealState = idealState;
        }

        void updateIdealStateOnSegmentCompletion(String str, String str2, String str3, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> map) {
            updateInstanceStatesForNewConsumingSegment(this._idealState.getRecord().getMapFields(), str2, str3, segmentAssignment, map);
        }

        int getNumPartitions(StreamConfig streamConfig) {
            return this._numPartitions;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: getPartitionOffset, reason: merged with bridge method [inline-methods] */
        public LongMsgOffset m25getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int i) {
            Assert.assertTrue(offsetCriteria.isSmallest());
            return PinotLLCRealtimeSegmentManagerTest.PARTITION_OFFSET;
        }

        boolean isExceededMaxSegmentCompletionTime(String str, String str2, long j) {
            return this._exceededMaxSegmentCompletionTime;
        }

        long getCurrentTimeMs() {
            return PinotLLCRealtimeSegmentManagerTest.CURRENT_TIME_MS;
        }

        static {
            CONTROLLER_CONF.setDataDir(PinotLLCRealtimeSegmentManagerTest.TEMP_DIR.toString());
        }
    }

    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest$FakePinotLLCRealtimeSegmentManagerII.class */
    private static class FakePinotLLCRealtimeSegmentManagerII extends FakePinotLLCRealtimeSegmentManager {
        final Scenario _scenario;

        /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest$FakePinotLLCRealtimeSegmentManagerII$Scenario.class */
        enum Scenario {
            ZK_VERSION_CHANGED,
            METADATA_STATUS_CHANGED
        }

        FakePinotLLCRealtimeSegmentManagerII(Scenario scenario) {
            this._scenario = scenario;
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManagerTest.FakePinotLLCRealtimeSegmentManager
        LLCRealtimeSegmentZKMetadata getSegmentZKMetadata(String str, String str2, @Nullable Stat stat) {
            LLCRealtimeSegmentZKMetadata segmentZKMetadata = super.getSegmentZKMetadata(str, str2, stat);
            switch (this._scenario) {
                case ZK_VERSION_CHANGED:
                    if (stat != null) {
                        persistSegmentZKMetadata(str, segmentZKMetadata, stat.getVersion());
                        break;
                    }
                    break;
                case METADATA_STATUS_CHANGED:
                    segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
                    break;
            }
            return segmentZKMetadata;
        }
    }

    @BeforeClass
    public void setUp() {
        System.out.println("Using random seed: " + RANDOM_SEED);
    }

    @AfterClass
    public void tearDown() throws IOException {
        FileUtils.deleteDirectory(TEMP_DIR);
    }

    private SegmentMetadataImpl mockSegmentMetadata() {
        SegmentMetadataImpl segmentMetadataImpl = (SegmentMetadataImpl) Mockito.mock(SegmentMetadataImpl.class);
        Mockito.when(segmentMetadataImpl.getTimeInterval()).thenReturn(INTERVAL);
        Mockito.when(segmentMetadataImpl.getCrc()).thenReturn(CRC);
        Mockito.when(segmentMetadataImpl.getVersion()).thenReturn(SEGMENT_VERSION);
        Mockito.when(Integer.valueOf(segmentMetadataImpl.getTotalDocs())).thenReturn(Integer.valueOf(NUM_DOCS));
        return segmentMetadataImpl;
    }

    @Test
    public void testSetUpNewTable() {
        testSetUpNewTable(2, 1, 4, true);
        testSetUpNewTable(2, 3, 0, false);
        testSetUpNewTable(2, 3, 4, false);
        testSetUpNewTable(2, 3, 8, false);
        testSetUpNewTable(8, 10, 4, false);
    }

    private void testSetUpNewTable(int i, int i2, int i3, boolean z) {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        fakePinotLLCRealtimeSegmentManager._numReplicas = i;
        fakePinotLLCRealtimeSegmentManager.makeTableConfig();
        fakePinotLLCRealtimeSegmentManager._numInstances = i2;
        fakePinotLLCRealtimeSegmentManager.makeConsumingInstancePartitions();
        fakePinotLLCRealtimeSegmentManager._numPartitions = i3;
        try {
            fakePinotLLCRealtimeSegmentManager.setUpNewTable();
            Assert.assertFalse(z);
            Map mapFields = fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields();
            Assert.assertEquals(mapFields.size(), i3);
            Assert.assertEquals(fakePinotLLCRealtimeSegmentManager.getAllSegments(REALTIME_TABLE_NAME).size(), i3);
            for (int i4 = 0; i4 < i3; i4++) {
                String segmentName = new LLCSegmentName(RAW_TABLE_NAME, i4, 0, CURRENT_TIME_MS).getSegmentName();
                Map map = (Map) mapFields.get(segmentName);
                Assert.assertNotNull(map);
                Assert.assertEquals(map.size(), i);
                Iterator it = map.values().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals((String) it.next(), "CONSUMING");
                }
                LLCRealtimeSegmentZKMetadata segmentZKMetadata = fakePinotLLCRealtimeSegmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentName, null);
                Assert.assertEquals(segmentZKMetadata.getStatus(), CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
                Assert.assertEquals(new LongMsgOffset(segmentZKMetadata.getStartOffset()).compareTo(PARTITION_OFFSET), 0);
                Assert.assertEquals(segmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
            }
        } catch (IllegalStateException e) {
            Assert.assertTrue(z);
        }
    }

    private void setUpNewTable(FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager, int i, int i2, int i3) {
        fakePinotLLCRealtimeSegmentManager._numReplicas = i;
        fakePinotLLCRealtimeSegmentManager.makeTableConfig();
        fakePinotLLCRealtimeSegmentManager._numInstances = i2;
        fakePinotLLCRealtimeSegmentManager.makeConsumingInstancePartitions();
        fakePinotLLCRealtimeSegmentManager._numPartitions = i3;
        fakePinotLLCRealtimeSegmentManager.setUpNewTable();
    }

    @Test
    public void testCommitSegment() {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        setUpNewTable(fakePinotLLCRealtimeSegmentManager, 2, 5, 4);
        Map mapFields = fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields();
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName, new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L);
        committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
        fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
        Map map = (Map) mapFields.get(segmentName);
        Assert.assertNotNull(map);
        Assert.assertEquals(new HashSet(map.values()), Collections.singleton("ONLINE"));
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
        Map map2 = (Map) mapFields.get(segmentName2);
        Assert.assertNotNull(map2);
        Assert.assertEquals(new HashSet(map2.values()), Collections.singleton("CONSUMING"));
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.get(segmentName);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getStatus(), CommonConstants.Segment.Realtime.Status.DONE);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString());
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getEndOffset(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString());
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getTimeInterval(), INTERVAL);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
        LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata2 = fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.get(segmentName2);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata2.getStatus(), CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata2.getStartOffset(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString());
        Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
        ((Map.Entry) map2.entrySet().iterator().next()).setValue("OFFLINE");
        CommittingSegmentDescriptor committingSegmentDescriptor2 = new CommittingSegmentDescriptor(segmentName2, new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS + NUM_DOCS).toString(), 0L);
        committingSegmentDescriptor2.setSegmentMetadata(mockSegmentMetadata());
        fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor2);
        Map map3 = (Map) mapFields.get(segmentName2);
        Assert.assertNotNull(map3);
        Assert.assertEquals(new HashSet(map3.values()), Collections.singleton("ONLINE"));
        Map map4 = (Map) mapFields.get(new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName());
        Assert.assertNotNull(map4);
        Assert.assertEquals(new HashSet(map4.values()), Collections.singleton("CONSUMING"));
        try {
            fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor2);
            Assert.fail();
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testSetUpNewPartitions() {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        setUpNewTable(fakePinotLLCRealtimeSegmentManager, 2, 5, 0);
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        fakePinotLLCRealtimeSegmentManager._numPartitions = 2;
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        fakePinotLLCRealtimeSegmentManager._numPartitions = 4;
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        for (int i = 0; i < 2; i++) {
            CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(new LLCSegmentName(RAW_TABLE_NAME, i, 0, CURRENT_TIME_MS).getSegmentName(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L);
            committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
            fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
        }
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        fakePinotLLCRealtimeSegmentManager._numPartitions = 6;
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        fakePinotLLCRealtimeSegmentManager._numInstances = 1;
        fakePinotLLCRealtimeSegmentManager.makeConsumingInstancePartitions();
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
        fakePinotLLCRealtimeSegmentManager._numPartitions = 8;
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, true);
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, true);
        fakePinotLLCRealtimeSegmentManager._numInstances = 5;
        fakePinotLLCRealtimeSegmentManager.makeConsumingInstancePartitions();
        fakePinotLLCRealtimeSegmentManager._exceededMaxSegmentCompletionTime = true;
        testSetUpNewPartitions(fakePinotLLCRealtimeSegmentManager, false);
    }

    private void testSetUpNewPartitions(FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager, boolean z) {
        Map<String, Map<String, String>> mapFields = fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields();
        Map<String, Map<String, String>> cloneInstanceStatesMap = cloneInstanceStatesMap(mapFields);
        Map<String, LLCRealtimeSegmentZKMetadata> map = fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap;
        Map<String, LLCRealtimeSegmentZKMetadata> cloneSegmentZKMetadataMap = cloneSegmentZKMetadataMap(map);
        try {
            fakePinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming();
            int i = 0;
            for (Map.Entry<String, Map<String, String>> entry : cloneInstanceStatesMap.entrySet()) {
                String key = entry.getKey();
                Assert.assertTrue(mapFields.containsKey(key));
                Assert.assertEquals(mapFields.get(key), entry.getValue());
                Assert.assertTrue(cloneSegmentZKMetadataMap.containsKey(key));
                Assert.assertTrue(map.containsKey(key));
                Assert.assertEquals(map.get(key), cloneSegmentZKMetadataMap.get(key));
                i = Math.max(i, new LLCSegmentName(key).getPartitionId() + 1);
            }
            HashMap hashMap = new HashMap();
            Iterator<Map.Entry<String, Map<String, String>>> it = mapFields.entrySet().iterator();
            while (it.hasNext()) {
                String key2 = it.next().getKey();
                ((List) hashMap.computeIfAbsent(Integer.valueOf(new LLCSegmentName(key2).getPartitionId()), num -> {
                    return new ArrayList();
                })).add(key2);
            }
            for (int i2 = i; i2 < fakePinotLLCRealtimeSegmentManager._numPartitions; i2++) {
                List list = (List) hashMap.get(Integer.valueOf(i2));
                Assert.assertEquals(list.size(), 1);
                String str = (String) list.get(0);
                Assert.assertFalse(cloneInstanceStatesMap.containsKey(str));
                Map<String, String> map2 = mapFields.get(str);
                Assert.assertEquals(map2.size(), fakePinotLLCRealtimeSegmentManager._numReplicas);
                Iterator<String> it2 = map2.values().iterator();
                while (it2.hasNext()) {
                    Assert.assertEquals(it2.next(), "CONSUMING");
                }
                Assert.assertTrue(map.containsKey(str));
                LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = map.get(str);
                Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getStatus(), CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
                Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString());
                Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
            }
        } catch (IllegalStateException e) {
            Assert.assertTrue(z);
            fakePinotLLCRealtimeSegmentManager._idealState.getRecord().setMapFields(cloneInstanceStatesMap);
        }
    }

    private Map<String, Map<String, String>> cloneInstanceStatesMap(Map<String, Map<String, String>> map) {
        TreeMap treeMap = new TreeMap();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            treeMap.put(entry.getKey(), new TreeMap(entry.getValue()));
        }
        return treeMap;
    }

    private Map<String, LLCRealtimeSegmentZKMetadata> cloneSegmentZKMetadataMap(Map<String, LLCRealtimeSegmentZKMetadata> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, LLCRealtimeSegmentZKMetadata> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
        }
        return hashMap;
    }

    @Test
    public void testRepairs() {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        setUpNewTable(fakePinotLLCRealtimeSegmentManager, 2, 5, 4);
        Map<String, Map<String, String>> mapFields = fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields();
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
        removeNewConsumingSegment(mapFields, segmentName, null);
        testRepairs(fakePinotLLCRealtimeSegmentManager);
        removeNewConsumingSegment(mapFields, segmentName, null);
        Assert.assertNotNull(fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.remove(segmentName));
        testRepairs(fakePinotLLCRealtimeSegmentManager);
        for (int i = 0; i < 2; i++) {
            CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(new LLCSegmentName(RAW_TABLE_NAME, i, 0, CURRENT_TIME_MS).getSegmentName(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L);
            committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
            fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
        }
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
        String segmentName3 = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
        removeNewConsumingSegment(mapFields, segmentName2, segmentName3);
        testRepairs(fakePinotLLCRealtimeSegmentManager);
        removeNewConsumingSegment(mapFields, segmentName2, segmentName3);
        Assert.assertNotNull(fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.remove(segmentName2));
        testRepairs(fakePinotLLCRealtimeSegmentManager);
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager2 = new FakePinotLLCRealtimeSegmentManager();
        setUpNewTable(fakePinotLLCRealtimeSegmentManager2, 2, 5, 4);
        Map<String, Map<String, String>> mapFields2 = fakePinotLLCRealtimeSegmentManager2._idealState.getRecord().getMapFields();
        turnNewConsumingSegmentOffline(mapFields2, new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName());
        testRepairs(fakePinotLLCRealtimeSegmentManager2);
        turnNewConsumingSegmentOffline(mapFields2, new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName());
        testRepairs(fakePinotLLCRealtimeSegmentManager2);
        int i2 = 0;
        while (i2 < 2) {
            CommittingSegmentDescriptor committingSegmentDescriptor2 = new CommittingSegmentDescriptor(new LLCSegmentName(RAW_TABLE_NAME, i2, i2 == 0 ? 2 : 0, CURRENT_TIME_MS).getSegmentName(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L);
            committingSegmentDescriptor2.setSegmentMetadata(mockSegmentMetadata());
            fakePinotLLCRealtimeSegmentManager2.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor2);
            i2++;
        }
        String segmentName4 = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
        String segmentName5 = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName();
        removeNewConsumingSegment(mapFields2, segmentName4, segmentName5);
        testRepairs(fakePinotLLCRealtimeSegmentManager2);
        removeNewConsumingSegment(mapFields2, segmentName4, segmentName5);
        Assert.assertNotNull(fakePinotLLCRealtimeSegmentManager2._segmentZKMetadataMap.remove(segmentName4));
        testRepairs(fakePinotLLCRealtimeSegmentManager2);
        turnNewConsumingSegmentOffline(mapFields2, new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName());
        testRepairs(fakePinotLLCRealtimeSegmentManager2);
        turnNewConsumingSegmentOffline(mapFields2, new LLCSegmentName(RAW_TABLE_NAME, 0, 4, CURRENT_TIME_MS).getSegmentName());
        testRepairs(fakePinotLLCRealtimeSegmentManager2);
    }

    private void removeNewConsumingSegment(Map<String, Map<String, String>> map, String str, @Nullable String str2) {
        Map<String, String> remove = map.remove(str);
        Assert.assertNotNull(remove);
        Assert.assertEquals(new HashSet(remove.values()), Collections.singleton("CONSUMING"));
        if (str2 != null) {
            Map<String, String> map2 = map.get(str2);
            Assert.assertNotNull(map2);
            for (Map.Entry<String, String> entry : map2.entrySet()) {
                Assert.assertEquals(entry.getValue(), "ONLINE");
                entry.setValue("CONSUMING");
            }
        }
    }

    private void turnNewConsumingSegmentOffline(Map<String, Map<String, String>> map, String str) {
        Map<String, String> map2 = map.get(str);
        Assert.assertNotNull(map2);
        for (Map.Entry<String, String> entry : map2.entrySet()) {
            Assert.assertEquals(entry.getValue(), "CONSUMING");
            entry.setValue("OFFLINE");
        }
    }

    private void testRepairs(FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager) {
        Map<String, Map<String, String>> cloneInstanceStatesMap = cloneInstanceStatesMap(fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields());
        fakePinotLLCRealtimeSegmentManager._exceededMaxSegmentCompletionTime = false;
        fakePinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming();
        verifyNoChangeToOldEntries(fakePinotLLCRealtimeSegmentManager, cloneInstanceStatesMap);
        fakePinotLLCRealtimeSegmentManager._exceededMaxSegmentCompletionTime = true;
        fakePinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming();
        verifyRepairs(fakePinotLLCRealtimeSegmentManager);
    }

    private void verifyNoChangeToOldEntries(FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager, Map<String, Map<String, String>> map) {
        Map mapFields = fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            String key = entry.getKey();
            Assert.assertTrue(mapFields.containsKey(key));
            Assert.assertEquals(mapFields.get(key), entry.getValue());
        }
    }

    private void verifyRepairs(FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager) {
        Map mapFields = fakePinotLLCRealtimeSegmentManager._idealState.getRecord().getMapFields();
        Assert.assertEquals(mapFields.keySet(), fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.keySet());
        ArrayList arrayList = new ArrayList(fakePinotLLCRealtimeSegmentManager._numPartitions);
        for (int i = 0; i < fakePinotLLCRealtimeSegmentManager._numPartitions; i++) {
            arrayList.add(new TreeMap());
        }
        for (Map.Entry entry : mapFields.entrySet()) {
            String str = (String) entry.getKey();
            Map map = (Map) entry.getValue();
            if (map.containsValue("ONLINE") || map.containsValue("CONSUMING")) {
                LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
                Map map2 = (Map) arrayList.get(lLCSegmentName.getPartitionId());
                int sequenceNumber = lLCSegmentName.getSequenceNumber();
                Assert.assertFalse(map2.containsKey(Integer.valueOf(sequenceNumber)));
                map2.put(Integer.valueOf(sequenceNumber), str);
            }
        }
        for (int i2 = 0; i2 < fakePinotLLCRealtimeSegmentManager._numPartitions; i2++) {
            ArrayList arrayList2 = new ArrayList(((Map) arrayList.get(i2)).values());
            Assert.assertFalse(arrayList2.isEmpty());
            int size = arrayList2.size();
            String str2 = (String) arrayList2.get(size - 1);
            Map map3 = (Map) mapFields.get(str2);
            Assert.assertTrue(map3.containsValue("CONSUMING"));
            Assert.assertFalse(map3.containsValue("ONLINE"));
            Assert.assertEquals(fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.get(str2).getStatus(), CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
            for (int i3 = 0; i3 < size - 1; i3++) {
                String str3 = (String) arrayList2.get(i3);
                Assert.assertEquals(new HashSet(((Map) mapFields.get(str3)).values()), Collections.singleton("ONLINE"));
                LLCRealtimeSegmentZKMetadata lLCRealtimeSegmentZKMetadata = fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.get(str3);
                Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getStatus(), CommonConstants.Segment.Realtime.Status.DONE);
                Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getStartOffset(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + (i3 * NUM_DOCS)).toString());
                Assert.assertEquals(lLCRealtimeSegmentZKMetadata.getEndOffset(), fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.get(arrayList2.get(i3 + 1)).getStartOffset());
            }
        }
    }

    @Test(expectedExceptions = {IllegalStateException.class})
    public void testPreExistingSegments() {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        fakePinotLLCRealtimeSegmentManager._numReplicas = 2;
        fakePinotLLCRealtimeSegmentManager.makeTableConfig();
        fakePinotLLCRealtimeSegmentManager._numInstances = 5;
        fakePinotLLCRealtimeSegmentManager.makeConsumingInstancePartitions();
        fakePinotLLCRealtimeSegmentManager._numPartitions = 4;
        fakePinotLLCRealtimeSegmentManager._segmentZKMetadataMap.put(new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(), new LLCRealtimeSegmentZKMetadata());
        fakePinotLLCRealtimeSegmentManager.setUpNewTable();
    }

    @Test
    public void testCommitSegmentWhenControllerWentThroughGC() {
        FakePinotLLCRealtimeSegmentManagerII fakePinotLLCRealtimeSegmentManagerII = new FakePinotLLCRealtimeSegmentManagerII(FakePinotLLCRealtimeSegmentManagerII.Scenario.ZK_VERSION_CHANGED);
        setUpNewTable(fakePinotLLCRealtimeSegmentManagerII, 2, 5, 4);
        FakePinotLLCRealtimeSegmentManagerII fakePinotLLCRealtimeSegmentManagerII2 = new FakePinotLLCRealtimeSegmentManagerII(FakePinotLLCRealtimeSegmentManagerII.Scenario.METADATA_STATUS_CHANGED);
        setUpNewTable(fakePinotLLCRealtimeSegmentManagerII2, 2, 5, 4);
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(), new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L);
        committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
        try {
            fakePinotLLCRealtimeSegmentManagerII.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
            Assert.fail();
        } catch (IllegalStateException e) {
        }
        try {
            fakePinotLLCRealtimeSegmentManagerII2.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
            Assert.fail();
        } catch (IllegalStateException e2) {
        }
    }

    @Test
    public void testCommitSegmentFile() throws Exception {
        PinotFSFactory.init(new PinotConfiguration());
        File file = new File(TEMP_DIR, RAW_TABLE_NAME);
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
        String generateSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
        File file2 = new File(file, generateSegmentFileName);
        FileUtils.write(file2, "temporary file contents");
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName, PARTITION_OFFSET.toString(), 0L, SCHEME + file + "/" + generateSegmentFileName);
        fakePinotLLCRealtimeSegmentManager.commitSegmentFile(REALTIME_TABLE_NAME, committingSegmentDescriptor);
        Assert.assertEquals(committingSegmentDescriptor.getSegmentLocation(), URIUtils.getUri(file.toString(), new String[]{URIUtils.encode(segmentName)}).toString());
        Assert.assertFalse(file2.exists());
    }

    @Test
    public void testSegmentAlreadyThereAndExtraneousFilesDeleted() throws Exception {
        PinotFSFactory.init(new PinotConfiguration());
        File file = new File(TEMP_DIR, RAW_TABLE_NAME);
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, CURRENT_TIME_MS).getSegmentName();
        String generateSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
        String generateSegmentFileName2 = SegmentCompletionUtils.generateSegmentFileName(segmentName);
        String generateSegmentFileName3 = SegmentCompletionUtils.generateSegmentFileName(segmentName2);
        File file2 = new File(file, generateSegmentFileName);
        File file3 = new File(file, generateSegmentFileName2);
        File file4 = new File(file, generateSegmentFileName3);
        FileUtils.write(file2, "temporary file contents");
        FileUtils.write(file3, "temporary file contents");
        FileUtils.write(file4, "temporary file contents");
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName, PARTITION_OFFSET.toString(), 0L, SCHEME + file + "/" + generateSegmentFileName);
        fakePinotLLCRealtimeSegmentManager.commitSegmentFile(REALTIME_TABLE_NAME, committingSegmentDescriptor);
        Assert.assertEquals(committingSegmentDescriptor.getSegmentLocation(), URIUtils.getUri(file.toString(), new String[]{URIUtils.encode(segmentName)}).toString());
        Assert.assertFalse(file2.exists());
        Assert.assertFalse(file3.exists());
        Assert.assertTrue(file4.exists());
    }

    @Test
    public void testStopSegmentManager() throws Exception {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        fakePinotLLCRealtimeSegmentManager._numReplicas = 2;
        fakePinotLLCRealtimeSegmentManager.makeTableConfig();
        fakePinotLLCRealtimeSegmentManager._numInstances = 5;
        fakePinotLLCRealtimeSegmentManager.makeConsumingInstancePartitions();
        fakePinotLLCRealtimeSegmentManager._numPartitions = 4;
        fakePinotLLCRealtimeSegmentManager.stop();
        try {
            fakePinotLLCRealtimeSegmentManager.setUpNewTable(fakePinotLLCRealtimeSegmentManager._tableConfig, new IdealState(REALTIME_TABLE_NAME));
            Assert.fail();
        } catch (IllegalStateException e) {
        }
        try {
            fakePinotLLCRealtimeSegmentManager.removeLLCSegments(new IdealState(REALTIME_TABLE_NAME));
            Assert.fail();
        } catch (IllegalStateException e2) {
        }
        try {
            fakePinotLLCRealtimeSegmentManager.commitSegmentFile(REALTIME_TABLE_NAME, (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class));
            Assert.fail();
        } catch (IllegalStateException e3) {
        }
        try {
            fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class));
            Assert.fail();
        } catch (IllegalStateException e4) {
        }
        try {
            fakePinotLLCRealtimeSegmentManager.segmentStoppedConsuming(new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS), "Server_0");
            Assert.fail();
        } catch (IllegalStateException e5) {
        }
        try {
            fakePinotLLCRealtimeSegmentManager.ensureAllPartitionsConsuming(fakePinotLLCRealtimeSegmentManager._tableConfig, fakePinotLLCRealtimeSegmentManager._streamConfig);
            Assert.fail();
        } catch (IllegalStateException e6) {
        }
    }

    @Test
    public void testCommitSegmentMetadata() {
        FakePinotLLCRealtimeSegmentManager fakePinotLLCRealtimeSegmentManager = new FakePinotLLCRealtimeSegmentManager();
        setUpNewTable(fakePinotLLCRealtimeSegmentManager, 2, 5, 4);
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
        CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName, new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L, "http://control_vip/segments/segment1");
        committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
        fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
        Assert.assertEquals(fakePinotLLCRealtimeSegmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentName, null).getDownloadUrl(), "http://control_vip/segments/segment1");
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
        CommittingSegmentDescriptor committingSegmentDescriptor2 = new CommittingSegmentDescriptor(segmentName2, new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L, "peer:///segment1");
        committingSegmentDescriptor2.setSegmentMetadata(mockSegmentMetadata());
        fakePinotLLCRealtimeSegmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor2);
        Assert.assertEquals(fakePinotLLCRealtimeSegmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentName2, null).getDownloadUrl(), "");
    }

    static {
        SEGMENT_VERSION = RANDOM.nextBoolean() ? SegmentVersion.v1.toString() : SegmentVersion.v3.toString();
        NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
    }
}
