package org.apache.pinot.controller.helix.core.retention;

import com.yammer.metrics.core.MetricsRegistry;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.class */
public class SegmentLineageCleanupTest extends ControllerTest {
    private static final int BASE_SERVER_ADMIN_PORT = 10000;
    private static final int NUM_INSTANCES = 1;
    private static final long MAX_TIMEOUT_IN_MILLISECOND = 10000;
    private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
    private static final String BROKER_TENANT_NAME = "brokerTenant";
    private static final String SERVER_TENANT_NAME = "serverTenant";
    private static final String RETENTION_TIME_UNIT = "DAYS";
    private static final String RETENTION_TIME_VALUE = "1";
    private RetentionManager _retentionManager;

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        Map<String, Object> defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.put("cluster.tenant.isolation.enable", false);
        startController(defaultControllerConfiguration);
        addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
        addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT);
        this._helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, NUM_INSTANCES, NUM_INSTANCES, 0));
        Assert.assertTrue(this._helixResourceManager.createBrokerTenant(new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, NUM_INSTANCES, 0, 0)).isSuccessful());
        enableResourceConfigForLeadControllerResource(true);
        LeadControllerManager leadControllerManager = (LeadControllerManager) Mockito.mock(LeadControllerManager.class);
        Mockito.when(Boolean.valueOf(leadControllerManager.isLeaderForTable(Mockito.anyString()))).thenReturn(true);
        ControllerConf controllerConf = new ControllerConf();
        ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
        controllerConf.setRetentionControllerFrequencyInSeconds(0);
        controllerConf.setDeletedSegmentsRetentionInDays(0);
        this._retentionManager = new RetentionManager(this._helixResourceManager, leadControllerManager, controllerConf, controllerMetrics);
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(NUM_INSTANCES).setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE).build());
    }

    @Test
    public void testSegmentLineageCleanup() throws IOException, InterruptedException {
        for (int i = 0; i < 5; i += NUM_INSTANCES) {
            this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "segment_" + i), "downloadUrl");
        }
        for (int i2 = 0; i2 < 2; i2 += NUM_INSTANCES) {
            this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_" + i2), "downloadUrl");
        }
        Assert.assertEquals(this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).size(), 7);
        long currentTimeMillis = System.currentTimeMillis();
        SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
        segmentLineage.addLineageEntry("0", new LineageEntry(Arrays.asList("segment_0", "segment_1"), Arrays.asList("merged_0"), LineageEntryState.IN_PROGRESS, currentTimeMillis));
        SegmentLineageAccessHelper.writeSegmentLineage(this._helixResourceManager.getPropertyStore(), segmentLineage, -1);
        this._retentionManager.processTable(OFFLINE_TABLE_NAME);
        waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 7);
        Assert.assertEquals(this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).size(), 7);
        segmentLineage.updateLineageEntry("0", new LineageEntry(Arrays.asList("segment_0", "segment_1"), Arrays.asList("merged_0"), LineageEntryState.COMPLETED, currentTimeMillis));
        SegmentLineageAccessHelper.writeSegmentLineage(this._helixResourceManager.getPropertyStore(), segmentLineage, -1);
        this._retentionManager.processTable(OFFLINE_TABLE_NAME);
        waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 5);
        List segmentsFor = this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME);
        Assert.assertEquals(segmentsFor.size(), 5);
        Assert.assertTrue(Collections.disjoint(segmentsFor, Arrays.asList("segment_0", "segment_1")));
        this._helixResourceManager.deleteSegment(OFFLINE_TABLE_NAME, "merged_0");
        waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
        this._retentionManager.processTable(OFFLINE_TABLE_NAME);
        waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
        List segmentsFor2 = this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME);
        Assert.assertEquals(segmentsFor2.size(), 4);
        Assert.assertTrue(Collections.disjoint(segmentsFor2, Arrays.asList("segment_0", "segment_1", "merged_0")));
        SegmentLineage segmentLineage2 = SegmentLineageAccessHelper.getSegmentLineage(this._helixResourceManager.getPropertyStore(), OFFLINE_TABLE_NAME);
        Assert.assertEquals(segmentLineage2.getLineageEntryIds().size(), 0);
        segmentLineage2.addLineageEntry(RETENTION_TIME_VALUE, new LineageEntry(Arrays.asList("segment_2", "segment_3"), Arrays.asList("merged_1", "merged_2"), LineageEntryState.IN_PROGRESS, currentTimeMillis - TimeUnit.DAYS.toMillis(2L)));
        SegmentLineageAccessHelper.writeSegmentLineage(this._helixResourceManager.getPropertyStore(), segmentLineage2, -1);
        this._retentionManager.processTable(OFFLINE_TABLE_NAME);
        waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 3);
        List segmentsFor3 = this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME);
        Assert.assertEquals(segmentsFor3.size(), 3);
        Assert.assertTrue(Collections.disjoint(segmentsFor3, Arrays.asList("merged_1", "merged_2")));
        Assert.assertEquals(SegmentLineageAccessHelper.getSegmentLineage(this._helixResourceManager.getPropertyStore(), OFFLINE_TABLE_NAME).getLineageEntryIds().size(), NUM_INSTANCES);
    }

    private void waitForSegmentsToDelete(String str, int i) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + MAX_TIMEOUT_IN_MILLISECOND;
        while (this._helixResourceManager.getSegmentsFor(str).size() != i) {
            Thread.sleep(500L);
            if (System.currentTimeMillis() >= currentTimeMillis) {
                throw new RuntimeException("Timeout while waiting for segments to be deleted");
            }
        }
    }
}
