package io.trino.server;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import io.trino.Session;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.metadata.Metadata;
import io.trino.metadata.MetadataManager;
import io.trino.operator.StageExecutionDescriptor;
import io.trino.server.DynamicFilterService;
import io.trino.spi.QueryId;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.TestingColumnHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.VarcharType;
import io.trino.sql.DynamicFilters;
import io.trino.sql.analyzer.TypeSignatureTranslator;
import io.trino.sql.planner.Partitioning;
import io.trino.sql.planner.PartitioningHandle;
import io.trino.sql.planner.PartitioningScheme;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.SystemPartitioningHandle;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanFragmentId;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.RemoteSourceNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.tree.Cast;
import io.trino.sql.tree.SymbolReference;
import io.trino.testing.TestingHandles;
import io.trino.testing.TestingMetadata;
import io.trino.testing.TestingSession;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/server/TestDynamicFilterService.class */
public class TestDynamicFilterService {
    private final Metadata metadata = MetadataManager.createTestMetadataManager();
    private final TypeOperators typeOperators = new TypeOperators();
    private static final Session session = TestingSession.testSessionBuilder().build();

    @Test
    public void testDynamicFilterSummaryCompletion() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df");
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 0);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 3);
        Assert.assertFalse(dynamicFilterService.getSummary(queryId, dynamicFilterId).isPresent());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats.getTotalDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getDynamicFiltersCompleted(), 0);
        Assert.assertEquals(dynamicFilteringStats.getLazyDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getReplicatedDynamicFilters(), 0);
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L)));
        Assert.assertFalse(dynamicFilterService.getSummary(queryId, dynamicFilterId).isPresent());
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 0);
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 1), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 2L)));
        Assert.assertFalse(dynamicFilterService.getSummary(queryId, dynamicFilterId).isPresent());
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 0);
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 2), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 3L)));
        Optional summary = dynamicFilterService.getSummary(queryId, dynamicFilterId);
        Assert.assertTrue(summary.isPresent());
        Assert.assertEquals(summary.get(), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L, 3L)));
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats2 = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFiltersCompleted(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getLazyDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getReplicatedDynamicFilters(), 0);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFilterDomainStats(), ImmutableList.of(new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId, getExpectedDomainString(1L, 3L), 3, 0)));
    }

    @Test
    public void testDynamicFilter() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        DynamicFilterId dynamicFilterId2 = new DynamicFilterId("df2");
        DynamicFilterId dynamicFilterId3 = new DynamicFilterId("df3");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        Symbol newSymbol2 = symbolAllocator.newSymbol("DF_SYMBOL2", IntegerType.INTEGER);
        Symbol newSymbol3 = symbolAllocator.newSymbol("DF_SYMBOL3", IntegerType.INTEGER);
        SymbolReference symbolReference = newSymbol.toSymbolReference();
        SymbolReference symbolReference2 = newSymbol2.toSymbolReference();
        SymbolReference symbolReference3 = newSymbol3.toSymbolReference();
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        StageId stageId2 = new StageId(queryId, 2);
        StageId stageId3 = new StageId(queryId, 3);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId, dynamicFilterId2, dynamicFilterId3), ImmutableSet.of(dynamicFilterId, dynamicFilterId2, dynamicFilterId3), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 2);
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId2, 2);
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId3, 2);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference), new DynamicFilters.Descriptor(dynamicFilterId2, symbolReference2), new DynamicFilters.Descriptor(dynamicFilterId3, symbolReference3)), ImmutableMap.of(newSymbol, new TestingColumnHandle("probeColumnA"), newSymbol2, new TestingColumnHandle("probeColumnA"), newSymbol3, new TestingColumnHandle("probeColumnB")), symbolAllocator.getTypes());
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats.getTotalDynamicFilters(), 3);
        Assert.assertEquals(dynamicFilteringStats.getDynamicFiltersCompleted(), 0);
        Assert.assertEquals(dynamicFilteringStats.getLazyDynamicFilters(), 3);
        Assert.assertEquals(dynamicFilteringStats.getReplicatedDynamicFilters(), 0);
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked.isDone());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L)));
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(isBlocked.isDone());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 1), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 2L)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L)))));
        Assert.assertTrue(isBlocked.isDone());
        Assert.assertFalse(isBlocked.isCompletedExceptionally());
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 1);
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        CompletableFuture isBlocked2 = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked2.isDone());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId2, 0), ImmutableMap.of(dynamicFilterId2, Domain.singleValue(IntegerType.INTEGER, 2L)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L)))));
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(isBlocked2.isDone());
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 1);
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId2, 1), ImmutableMap.of(dynamicFilterId2, Domain.singleValue(IntegerType.INTEGER, 3L)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 2L))));
        Assert.assertTrue(isBlocked2.isDone());
        Assert.assertFalse(isBlocked2.isCompletedExceptionally());
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 2);
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        CompletableFuture isBlocked3 = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked3.isDone());
        DynamicFilter createDynamicFilter2 = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference), new DynamicFilters.Descriptor(dynamicFilterId2, symbolReference2)), ImmutableMap.of(newSymbol, new TestingColumnHandle("probeColumnA"), newSymbol2, new TestingColumnHandle("probeColumnA")), symbolAllocator.getTypes());
        Assert.assertTrue(createDynamicFilter2.isComplete());
        Assert.assertFalse(createDynamicFilter2.isAwaitable());
        Assert.assertTrue(createDynamicFilter2.isBlocked().isDone());
        Assert.assertEquals(createDynamicFilter2.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 2L))));
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId3, 0), ImmutableMap.of(dynamicFilterId3, Domain.none(IntegerType.INTEGER)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 2L))));
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(isBlocked3.isDone());
        Assert.assertEquals(dynamicFilterService.getDynamicFilteringStats(queryId, session).getDynamicFiltersCompleted(), 2);
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId3, 1), ImmutableMap.of(dynamicFilterId3, Domain.none(IntegerType.INTEGER)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.none());
        Assert.assertTrue(isBlocked3.isDone());
        Assert.assertFalse(isBlocked3.isCompletedExceptionally());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats2 = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFiltersCompleted(), 3);
        Assert.assertEquals(dynamicFilteringStats2.getLazyDynamicFilters(), 3);
        Assert.assertEquals(dynamicFilteringStats2.getReplicatedDynamicFilters(), 0);
        Assert.assertEquals(ImmutableSet.copyOf(dynamicFilteringStats2.getDynamicFilterDomainStats()), ImmutableSet.of(new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId, getExpectedDomainString(1L, 2L), 2, 0), new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId2, getExpectedDomainString(2L, 3L), 2, 0), new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId3, Domain.none(IntegerType.INTEGER).toString(session.toConnectorSession()), 0, 0)));
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isAwaitable());
        Assert.assertTrue(createDynamicFilter.isBlocked().isDone());
    }

    @Test
    public void testShortCircuitOnAllTupleDomain() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        SymbolReference symbolReference = newSymbol.toSymbolReference();
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 2);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(newSymbol, new TestingColumnHandle("probeColumnA")), symbolAllocator.getTypes());
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isBlocked().isDone());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 1), ImmutableMap.of(dynamicFilterId, Domain.all(IntegerType.INTEGER)));
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.isBlocked().isDone());
    }

    @Test
    public void testDynamicFilterCoercion() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        Cast cast = new Cast(newSymbol.toSymbolReference(), TypeSignatureTranslator.toSqlType(BigintType.BIGINT));
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 1);
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, cast)), ImmutableMap.of(newSymbol, new TestingColumnHandle("probeColumnA")), symbolAllocator.getTypes());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, Domain.multipleValues(BigintType.BIGINT, ImmutableList.of(1L, 2L, 3L))));
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L, 3L)))));
    }

    @Test
    public void testReplicatedDynamicFilter() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        SymbolReference symbolReference = newSymbol.toSymbolReference();
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(), ImmutableSet.of(dynamicFilterId));
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(newSymbol, new TestingColumnHandle("probeColumnA")), symbolAllocator.getTypes());
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats.getTotalDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getDynamicFiltersCompleted(), 0);
        Assert.assertEquals(dynamicFilteringStats.getReplicatedDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats.getLazyDynamicFilters(), 0);
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isAwaitable());
        Assert.assertTrue(createDynamicFilter.isBlocked().isDone());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 1L))));
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertFalse(createDynamicFilter.isAwaitable());
        DynamicFilterService.DynamicFiltersStats dynamicFilteringStats2 = dynamicFilterService.getDynamicFilteringStats(queryId, session);
        Assert.assertEquals(dynamicFilteringStats2.getTotalDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFiltersCompleted(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getReplicatedDynamicFilters(), 1);
        Assert.assertEquals(dynamicFilteringStats2.getLazyDynamicFilters(), 0);
        Assert.assertEquals(dynamicFilteringStats2.getDynamicFilterDomainStats(), ImmutableList.of(new DynamicFilterService.DynamicFilterDomainStats(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L).toString(session.toConnectorSession()), 1, 0)));
    }

    @Test
    public void testStageCannotScheduleMoreTasks() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        SymbolReference symbolReference = newSymbol.toSymbolReference();
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(newSymbol, new TestingColumnHandle("probeColumnA")), symbolAllocator.getTypes());
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked.isDone());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L)));
        Assert.assertTrue(createDynamicFilter.getCurrentPredicate().isAll());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertFalse(isBlocked.isDone());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 1);
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(new TestingColumnHandle("probeColumnA"), Domain.singleValue(IntegerType.INTEGER, 1L))));
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertTrue(isBlocked.isDone());
        Assert.assertFalse(isBlocked.isCompletedExceptionally());
    }

    @Test
    public void testDynamicFilterCancellation() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        SymbolReference symbolReference = newSymbol.toSymbolReference();
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 0);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 2);
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("probeColumnA");
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference)), ImmutableMap.of(newSymbol, testingColumnHandle), symbolAllocator.getTypes());
        Assert.assertFalse(createDynamicFilter.isBlocked().isDone());
        Assert.assertFalse(createDynamicFilter.isComplete());
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.all());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 1L)));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.all());
        CompletableFuture isBlocked = createDynamicFilter.isBlocked();
        Assert.assertFalse(isBlocked.isDone());
        Assert.assertFalse(isBlocked.cancel(false));
        Assert.assertFalse(createDynamicFilter.isBlocked().isDone());
        Assert.assertFalse(createDynamicFilter.isComplete());
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 1), ImmutableMap.of(dynamicFilterId, Domain.singleValue(IntegerType.INTEGER, 2L)));
        Assert.assertTrue(isBlocked.isDone());
        Assert.assertTrue(createDynamicFilter.isComplete());
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, Domain.multipleValues(IntegerType.INTEGER, ImmutableList.of(1L, 2L)))));
    }

    @Test
    public void testIsAwaitable() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        DynamicFilterId dynamicFilterId2 = new DynamicFilterId("df2");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("symbol", IntegerType.INTEGER);
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("probeColumnA");
        QueryId queryId = new QueryId("query");
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId, dynamicFilterId2), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, newSymbol.toSymbolReference())), ImmutableMap.of(newSymbol, testingColumnHandle), symbolAllocator.getTypes());
        DynamicFilter createDynamicFilter2 = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId2, newSymbol.toSymbolReference())), ImmutableMap.of(newSymbol, testingColumnHandle), symbolAllocator.getTypes());
        Assert.assertTrue(createDynamicFilter.isAwaitable());
        Assert.assertFalse(createDynamicFilter2.isAwaitable());
    }

    @Test
    public void testMultipleColumnMapping() {
        DynamicFilterService dynamicFilterService = new DynamicFilterService(this.metadata, this.typeOperators, MoreExecutors.newDirectExecutorService());
        DynamicFilterId dynamicFilterId = new DynamicFilterId("df1");
        SymbolAllocator symbolAllocator = new SymbolAllocator();
        Symbol newSymbol = symbolAllocator.newSymbol("DF_SYMBOL1", IntegerType.INTEGER);
        Symbol newSymbol2 = symbolAllocator.newSymbol("DF_SYMBOL2", IntegerType.INTEGER);
        SymbolReference symbolReference = newSymbol.toSymbolReference();
        SymbolReference symbolReference2 = newSymbol2.toSymbolReference();
        QueryId queryId = new QueryId("query");
        StageId stageId = new StageId(queryId, 1);
        dynamicFilterService.registerQuery(queryId, session, ImmutableSet.of(dynamicFilterId), ImmutableSet.of(dynamicFilterId), ImmutableSet.of());
        dynamicFilterService.stageCannotScheduleMoreTasks(stageId, 1);
        TestingColumnHandle testingColumnHandle = new TestingColumnHandle("probeColumnA");
        TestingColumnHandle testingColumnHandle2 = new TestingColumnHandle("probeColumnB");
        DynamicFilter createDynamicFilter = dynamicFilterService.createDynamicFilter(queryId, ImmutableList.of(new DynamicFilters.Descriptor(dynamicFilterId, symbolReference), new DynamicFilters.Descriptor(dynamicFilterId, symbolReference2)), ImmutableMap.of(newSymbol, testingColumnHandle, newSymbol2, testingColumnHandle2), symbolAllocator.getTypes());
        Domain singleValue = Domain.singleValue(IntegerType.INTEGER, 1L);
        dynamicFilterService.addTaskDynamicFilters(new TaskId(stageId, 0), ImmutableMap.of(dynamicFilterId, singleValue));
        Assert.assertEquals(createDynamicFilter.getCurrentPredicate(), TupleDomain.withColumnDomains(ImmutableMap.of(testingColumnHandle, singleValue, testingColumnHandle2, singleValue)));
    }

    @Test
    public void testSourceStageInnerLazyDynamicFilters() {
        DynamicFilterId dynamicFilterId = new DynamicFilterId("filterId");
        Assert.assertEquals(DynamicFilterService.getSourceStageInnerLazyDynamicFilters(createPlan(dynamicFilterId, SystemPartitioningHandle.SOURCE_DISTRIBUTION, ExchangeNode.Type.REPLICATE)), ImmutableSet.of(dynamicFilterId));
        Assert.assertEquals(DynamicFilterService.getSourceStageInnerLazyDynamicFilters(createPlan(dynamicFilterId, SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION, ExchangeNode.Type.REPLICATE)), ImmutableSet.of());
        Assert.assertEquals(DynamicFilterService.getSourceStageInnerLazyDynamicFilters(createPlan(dynamicFilterId, SystemPartitioningHandle.SOURCE_DISTRIBUTION, ExchangeNode.Type.REPARTITION)), ImmutableSet.of());
    }

    private static PlanFragment createPlan(DynamicFilterId dynamicFilterId, PartitioningHandle partitioningHandle, ExchangeNode.Type type) {
        Symbol symbol = new Symbol("column");
        Symbol symbol2 = new Symbol("buildColumn");
        PlanNodeId planNodeId = new PlanNodeId("plan_id");
        TableScanNode newInstance = TableScanNode.newInstance(planNodeId, TestingHandles.TEST_TABLE_HANDLE, ImmutableList.of(symbol), ImmutableMap.of(symbol, new TestingMetadata.TestingColumnHandle("column")), false, Optional.empty());
        FilterNode filterNode = new FilterNode(new PlanNodeId("filter_node_id"), newInstance, DynamicFilters.createDynamicFilterExpression(MetadataManager.createTestMetadataManager(), dynamicFilterId, VarcharType.VARCHAR, symbol.toSymbolReference()));
        RemoteSourceNode remoteSourceNode = new RemoteSourceNode(new PlanNodeId("remote_id"), new PlanFragmentId("plan_fragment_id"), ImmutableList.of(symbol2), Optional.empty(), type);
        return new PlanFragment(new PlanFragmentId("plan_id"), new JoinNode(new PlanNodeId("join_id"), JoinNode.Type.INNER, filterNode, remoteSourceNode, ImmutableList.of(), newInstance.getOutputSymbols(), remoteSourceNode.getOutputSymbols(), false, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableMap.of(dynamicFilterId, symbol2), Optional.empty()), ImmutableMap.of(symbol, VarcharType.VARCHAR), partitioningHandle, ImmutableList.of(planNodeId), new PartitioningScheme(Partitioning.create(SystemPartitioningHandle.SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)), StageExecutionDescriptor.ungroupedExecution(), StatsAndCosts.empty(), Optional.empty());
    }

    private static String getExpectedDomainString(long j, long j2) {
        return Domain.create(ValueSet.ofRanges(Range.range(IntegerType.INTEGER, Long.valueOf(j), true, Long.valueOf(j2), true), new Range[0]), false).toString(session.toConnectorSession());
    }
}
