package org.apache.kafka.streams.kstream.internals;

import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableFilterTest.class */
public class KTableFilterTest {
    private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
    private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer(), 0);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
    private final Predicate<String, Integer> predicate = (str, num) -> {
        return num.intValue() % 2 == 0;
    };

    @Before
    public void setUp() {
        this.props.setProperty("cache.max.bytes.buffering", "0");
    }

    private void doTestKTable(StreamsBuilder streamsBuilder, KTable<String, Integer> kTable, KTable<String, Integer> kTable2, String str) {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        kTable.toStream().process(mockProcessorSupplier, new String[0]);
        kTable2.toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 1, 10L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 2, 5L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "C", 3, 8L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "D", 4, 14L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", (Object) null, 18L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", (Object) null, 15L));
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:null (ts: 10)", "B:2 (ts: 5)", "C:null (ts: 8)", "D:4 (ts: 14)", "A:null (ts: 18)", "B:null (ts: 15)");
            ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:1 (ts: 10)", "B:null (ts: 5)", "C:3 (ts: 8)", "D:null (ts: 14)", "A:null (ts: 18)", "B:null (ts: 15)");
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPassThroughWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", this.consumed);
        KTable<String, Integer> filter = table.filter(this.predicate);
        KTable<String, Integer> filterNot = table.filterNot(this.predicate);
        Assert.assertNull(table.queryableStoreName());
        Assert.assertNull(filter.queryableStoreName());
        Assert.assertNull(filterNot.queryableStoreName());
        doTestKTable(streamsBuilder, filter, filterNot, "topic1");
    }

    @Test
    public void shouldPassThroughOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", this.consumed);
        KTable<String, Integer> filter = table.filter(this.predicate, Materialized.as("store2"));
        KTable<String, Integer> filterNot = table.filterNot(this.predicate);
        Assert.assertNull(table.queryableStoreName());
        Assert.assertEquals("store2", filter.queryableStoreName());
        Assert.assertNull(filterNot.queryableStoreName());
        doTestKTable(streamsBuilder, filter, filterNot, "topic1");
    }

    private void doTestValueGetter(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        Topology build = streamsBuilder.build();
        KTableValueGetterSupplier valueGetterSupplier = kTableImpl.valueGetterSupplier();
        KTableValueGetterSupplier valueGetterSupplier2 = kTableImpl2.valueGetterSupplier();
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(build);
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl.name, valueGetterSupplier.storeNames());
        internalTopologyBuilder.connectProcessorAndStateStores(kTableImpl2.name, valueGetterSupplier2.storeNames());
        TopologyTestDriverWrapper topologyTestDriverWrapper = new TopologyTestDriverWrapper(build, this.props);
        Throwable th = null;
        try {
            KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
            KTableValueGetter kTableValueGetter2 = valueGetterSupplier2.get();
            kTableValueGetter.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl.name));
            kTableValueGetter2.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(kTableImpl2.name));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", 1, 5L));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "B", 1, 10L));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "C", 1, 15L));
            Assert.assertNull(kTableValueGetter.get("A"));
            Assert.assertNull(kTableValueGetter.get("B"));
            Assert.assertNull(kTableValueGetter.get("C"));
            Assert.assertEquals(ValueAndTimestamp.make(1, 5L), kTableValueGetter2.get("A"));
            Assert.assertEquals(ValueAndTimestamp.make(1, 10L), kTableValueGetter2.get("B"));
            Assert.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", 2, 10L));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "B", 2, 5L));
            Assert.assertEquals(ValueAndTimestamp.make(2, 10L), kTableValueGetter.get("A"));
            Assert.assertEquals(ValueAndTimestamp.make(2, 5L), kTableValueGetter.get("B"));
            Assert.assertNull(kTableValueGetter.get("C"));
            Assert.assertNull(kTableValueGetter2.get("A"));
            Assert.assertNull(kTableValueGetter2.get("B"));
            Assert.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", 3, 15L));
            Assert.assertNull(kTableValueGetter.get("A"));
            Assert.assertEquals(ValueAndTimestamp.make(2, 5L), kTableValueGetter.get("B"));
            Assert.assertNull(kTableValueGetter.get("C"));
            Assert.assertEquals(ValueAndTimestamp.make(3, 15L), kTableValueGetter2.get("A"));
            Assert.assertNull(kTableValueGetter2.get("B"));
            Assert.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "A", (Object) null, 10L));
            topologyTestDriverWrapper.pipeInput(this.recordFactory.create(str, "B", (Object) null, 20L));
            Assert.assertNull(kTableValueGetter.get("A"));
            Assert.assertNull(kTableValueGetter.get("B"));
            Assert.assertNull(kTableValueGetter.get("C"));
            Assert.assertNull(kTableValueGetter2.get("A"));
            Assert.assertNull(kTableValueGetter2.get("B"));
            Assert.assertEquals(ValueAndTimestamp.make(1, 15L), kTableValueGetter2.get("C"));
            if (topologyTestDriverWrapper != null) {
                if (0 == 0) {
                    topologyTestDriverWrapper.close();
                    return;
                }
                try {
                    topologyTestDriverWrapper.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriverWrapper != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriverWrapper.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldGetValuesOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.consumed);
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) table.filter(this.predicate, Materialized.as("store2"));
        KTableImpl<String, Integer, Integer> kTableImpl2 = (KTableImpl) table.filterNot(this.predicate, Materialized.as("store3"));
        KTableImpl filterNot = table.filterNot(this.predicate);
        Assert.assertNull(table.queryableStoreName());
        Assert.assertEquals("store2", kTableImpl.queryableStoreName());
        Assert.assertEquals("store3", kTableImpl2.queryableStoreName());
        Assert.assertNull(filterNot.queryableStoreName());
        doTestValueGetter(streamsBuilder, kTableImpl, kTableImpl2, "topic1");
    }

    private void doTestNotSendingOldValue(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.build().addProcessor("proc1", mockProcessorSupplier, new String[]{kTableImpl.name});
        streamsBuilder.build().addProcessor("proc2", mockProcessorSupplier, new String[]{kTableImpl2.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 1, 5L));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 1, 10L));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "C", 1, 15L));
                List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-null) (ts: 5)", "B:(null<-null) (ts: 10)", "C:(null<-null) (ts: 15)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 2, 15L));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 2, 8L));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 3, 20L));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(3<-null) (ts: 20)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-null) (ts: 20)");
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", (Object) null, 10L));
                topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", (Object) null, 20L));
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(null<-null) (ts: 10)", "B:(null<-null) (ts: 20)");
                ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-null) (ts: 10)", "B:(null<-null) (ts: 20)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldNotSendOldValuesWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(this.predicate), "topic1");
    }

    @Test
    public void shouldNotSendOldValuesOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestNotSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(this.predicate, Materialized.as("store2")), "topic1");
    }

    private void doTestSendingOldValue(StreamsBuilder streamsBuilder, KTableImpl<String, Integer, Integer> kTableImpl, KTableImpl<String, Integer, Integer> kTableImpl2, String str) {
        kTableImpl2.enableSendingOldValues();
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        Topology build = streamsBuilder.build();
        build.addProcessor("proc1", mockProcessorSupplier, new String[]{kTableImpl.name});
        build.addProcessor("proc2", mockProcessorSupplier, new String[]{kTableImpl2.name});
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 1, 5L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 1, 10L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "C", 1, 15L));
            List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(1<-null) (ts: 5)", "B:(1<-null) (ts: 10)", "C:(1<-null) (ts: 15)");
            ((MockProcessor) capturedProcessors.get(1)).checkEmptyAndClearProcessResult();
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 2, 15L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", 2, 8L));
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(2<-1) (ts: 15)", "B:(2<-1) (ts: 8)");
            ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(2<-null) (ts: 15)", "B:(2<-null) (ts: 8)");
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", 3, 20L));
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(3<-2) (ts: 20)");
            ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("A:(null<-2) (ts: 20)");
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "A", (Object) null, 10L));
            topologyTestDriver.pipeInput(this.recordFactory.create(str, "B", (Object) null, 20L));
            ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(null<-3) (ts: 10)", "B:(null<-2) (ts: 20)");
            ((MockProcessor) capturedProcessors.get(1)).checkAndClearProcessResult("B:(null<-2) (ts: 20)");
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSendOldValuesWhenEnabledWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(this.predicate), "topic1");
    }

    @Test
    public void shouldSendOldValuesWhenEnabledOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, Integer, Integer> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", this.consumed);
        doTestSendingOldValue(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter(this.predicate, Materialized.as("store2")), "topic1");
    }

    private void doTestSkipNullOnMaterialization(StreamsBuilder streamsBuilder, KTableImpl<String, String, String> kTableImpl, KTableImpl<String, String, String> kTableImpl2, String str) {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        Topology build = streamsBuilder.build();
        build.addProcessor("proc1", mockProcessorSupplier, new String[]{kTableImpl.name});
        build.addProcessor("proc2", mockProcessorSupplier, new String[]{kTableImpl2.name});
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory(new StringSerializer(), new StringSerializer(), 0L);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(consumerRecordFactory.create(str, "A", "reject", 5L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create(str, "B", "reject", 10L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create(str, "C", "reject", 20L));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                List capturedProcessors = mockProcessorSupplier.capturedProcessors(2);
                ((MockProcessor) capturedProcessors.get(0)).checkAndClearProcessResult("A:(reject<-null) (ts: 5)", "B:(reject<-null) (ts: 10)", "C:(reject<-null) (ts: 20)");
                ((MockProcessor) capturedProcessors.get(1)).checkEmptyAndClearProcessResult();
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldSkipNullToRepartitionWithoutMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.String()));
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter((str, str2) -> {
            return str2.equalsIgnoreCase("accept");
        }).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER), "topic1");
    }

    @Test
    public void shouldSkipNullToRepartitionOnMaterialization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl<String, String, String> kTableImpl = (KTableImpl) streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.String()));
        doTestSkipNullOnMaterialization(streamsBuilder, kTableImpl, (KTableImpl) kTableImpl.filter((str, str2) -> {
            return str2.equalsIgnoreCase("accept");
        }, Materialized.as("store2")).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as("mock-result")), "topic1");
    }

    @Test
    public void testTypeVariance() {
        Predicate predicate = (number, obj) -> {
            return false;
        };
        new StreamsBuilder().table("empty").filter(predicate).filterNot(predicate).toStream().to("nirvana");
    }
}
