package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableSourceTest.class */
public class KTableSourceTest {
    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer(), 0);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void testKTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table("topic1", Consumed.with(Serdes.String(), Serdes.Integer()));
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        table.toStream().process(mockProcessorSupplier, new String[0]);
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory(new StringSerializer(), new IntegerSerializer(), 0L);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(consumerRecordFactory.create("topic1", "A", 1, 10L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create("topic1", "B", 2, 11L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create("topic1", "C", 3, 12L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create("topic1", "D", 4, 13L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create("topic1", "A", (Object) null, 14L));
                topologyTestDriver.pipeInput(consumerRecordFactory.create("topic1", "B", (Object) null, 15L));
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Assert.assertEquals(Arrays.asList("A:1 (ts: 10)", "B:2 (ts: 11)", "C:3 (ts: 12)", "D:4 (ts: 13)", "A:null (ts: 14)", "B:null (ts: 15)"), mockProcessorSupplier.theCapturedProcessor().processed);
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void kTableShouldLogAndMeterOnSkippedRecords() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic", this.stringConsumed);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                topologyTestDriver.pipeInput(this.recordFactory.create("topic", (Object) null, "value"));
                LogCaptureAppender.unregister(createAndRegister);
                Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(topologyTestDriver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testValueGetter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.stringConsumed, Materialized.as("store"));
        Topology build = streamsBuilder.build();
        KTableValueGetterSupplier valueGetterSupplier = table.valueGetterSupplier();
        TopologyWrapper.getInternalTopologyBuilder(build).connectProcessorAndStateStores(table.name, valueGetterSupplier.storeNames());
        TopologyTestDriverWrapper topologyTestDriverWrapper = new TopologyTestDriverWrapper(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                KTableValueGetter kTableValueGetter = valueGetterSupplier.get();
                kTableValueGetter.init(topologyTestDriverWrapper.setCurrentNodeForProcessorContext(table.name));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "A", "01", 10L));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "B", "01", 20L));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "C", "01", 15L));
                Assert.assertEquals(ValueAndTimestamp.make("01", 10L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 20L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "A", "02", 30L));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "B", "02", 5L));
                Assert.assertEquals(ValueAndTimestamp.make("02", 30L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make("02", 5L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "A", "03", 29L));
                Assert.assertEquals(ValueAndTimestamp.make("03", 29L), kTableValueGetter.get("A"));
                Assert.assertEquals(ValueAndTimestamp.make("02", 5L), kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "A", (String) null, 50L));
                topologyTestDriverWrapper.pipeInput(this.recordFactory.create("topic1", "B", (String) null, 3L));
                Assert.assertNull(kTableValueGetter.get("A"));
                Assert.assertNull(kTableValueGetter.get("B"));
                Assert.assertEquals(ValueAndTimestamp.make("01", 15L), kTableValueGetter.get("C"));
                if (topologyTestDriverWrapper != null) {
                    if (0 == 0) {
                        topologyTestDriverWrapper.close();
                        return;
                    }
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriverWrapper != null) {
                if (th != null) {
                    try {
                        topologyTestDriverWrapper.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriverWrapper.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNotSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.stringConsumed);
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc1", mockProcessorSupplier, new String[]{table.name}), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", "01", 10L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "B", "01", 20L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "C", "01", 15L));
                theCapturedProcessor.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", "02", 8L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "B", "02", 22L));
                theCapturedProcessor.checkAndClearProcessResult("A:(02<-null) (ts: 8)", "B:(02<-null) (ts: 22)");
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", "03", 12L));
                theCapturedProcessor.checkAndClearProcessResult("A:(03<-null) (ts: 12)");
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", (String) null, 15L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "B", (String) null, 20L));
                theCapturedProcessor.checkAndClearProcessResult("A:(null<-null) (ts: 15)", "B:(null<-null) (ts: 20)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSendingOldValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTableImpl table = streamsBuilder.table("topic1", this.stringConsumed);
        table.enableSendingOldValues();
        Assert.assertTrue(table.sendingOldValueEnabled());
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build().addProcessor("proc1", mockProcessorSupplier, new String[]{table.name}), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", "01", 10L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "B", "01", 20L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "C", "01", 15L));
                theCapturedProcessor.checkAndClearProcessResult("A:(01<-null) (ts: 10)", "B:(01<-null) (ts: 20)", "C:(01<-null) (ts: 15)");
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", "02", 8L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "B", "02", 22L));
                theCapturedProcessor.checkAndClearProcessResult("A:(02<-01) (ts: 8)", "B:(02<-01) (ts: 22)");
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", "03", 12L));
                theCapturedProcessor.checkAndClearProcessResult("A:(03<-02) (ts: 12)");
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "A", (String) null, 15L));
                topologyTestDriver.pipeInput(this.recordFactory.create("topic1", "B", (String) null, 20L));
                theCapturedProcessor.checkAndClearProcessResult("A:(null<-03) (ts: 15)", "B:(null<-02) (ts: 20)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
