package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.class */
public class SessionWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private final StreamsBuilder builder = new StreamsBuilder();
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final Merger<String, String> sessionMerger = (str, str2, str3) -> {
        return str2 + "+" + str3;
    };
    private SessionWindowedKStream<String, String> stream;

    @Before
    public void before() {
        this.stream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SessionWindows.with(Duration.ofMillis(500L)));
    }

    @Test
    public void shouldCountSessionWindowedWithCachingDisabled() {
        this.props.put("cache.max.bytes.buffering", 0);
        shouldCountSessionWindowed();
    }

    @Test
    public void shouldCountSessionWindowedWithCachingEnabled() {
        shouldCountSessionWindowed();
    }

    private void shouldCountSessionWindowed() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.stream.count().toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Map<K, ValueAndTimestamp<V>> map = mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey;
                MatcherAssert.assertThat(Integer.valueOf(map.size()), CoreMatchers.equalTo(3));
                MatcherAssert.assertThat(map.get(new Windowed("1", new SessionWindow(10L, 15L))), CoreMatchers.equalTo(ValueAndTimestamp.make(2L, 15L)));
                MatcherAssert.assertThat(map.get(new Windowed("2", new SessionWindow(599L, 600L))), CoreMatchers.equalTo(ValueAndTimestamp.make(2L, 600L)));
                MatcherAssert.assertThat(map.get(new Windowed("1", new SessionWindow(600L, 600L))), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 600L)));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReduceWindowed() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.stream.reduce(MockReducer.STRING_ADDER).toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Map<K, ValueAndTimestamp<V>> map = mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey;
                MatcherAssert.assertThat(Integer.valueOf(map.size()), CoreMatchers.equalTo(3));
                MatcherAssert.assertThat(map.get(new Windowed("1", new SessionWindow(10L, 15L))), CoreMatchers.equalTo(ValueAndTimestamp.make("1+2", 15L)));
                MatcherAssert.assertThat(map.get(new Windowed("2", new SessionWindow(599L, 600L))), CoreMatchers.equalTo(ValueAndTimestamp.make("1+2", 600L)));
                MatcherAssert.assertThat(map.get(new Windowed("1", new SessionWindow(600L, 600L))), CoreMatchers.equalTo(ValueAndTimestamp.make("3", 600L)));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAggregateSessionWindowed() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.with(Serdes.String(), Serdes.String())).toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                Map<K, ValueAndTimestamp<V>> map = mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey;
                MatcherAssert.assertThat(Integer.valueOf(map.size()), CoreMatchers.equalTo(3));
                MatcherAssert.assertThat(map.get(new Windowed("1", new SessionWindow(10L, 15L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+0+1+2", 15L)));
                MatcherAssert.assertThat(map.get(new Windowed("2", new SessionWindow(599L, 600L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+0+1+2", 600L)));
                MatcherAssert.assertThat(map.get(new Windowed("1", new SessionWindow(600L, 600L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+3", 600L)));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMaterializeCount() {
        this.stream.count(Materialized.as("count-store"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getSessionStore("count-store").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), 2L), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), 1L), KeyValue.pair(new Windowed("2", new SessionWindow(599L, 600L)), 2L))));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMaterializeReduced() {
        this.stream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getSessionStore("reduced").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), "1+2"), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), "3"), KeyValue.pair(new Windowed("2", new SessionWindow(599L, 600L)), "1+2"))));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as("aggregated").withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getSessionStore("aggregated").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), "0+0+1+2"), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), "0+3"), KeyValue.pair(new Windowed("2", new SessionWindow(599L, 600L)), "0+0+1+2"))));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        this.stream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, this.sessionMerger);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, this.sessionMerger);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        this.stream.reduce((Reducer) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        this.stream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, this.sessionMerger, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        this.stream.reduce((Reducer) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        this.stream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        this.stream.count((Materialized) null);
    }

    private void processData(TopologyTestDriver topologyTestDriver) {
        topologyTestDriver.pipeInput(this.recordFactory.create(TOPIC, "1", "1", 10L));
        topologyTestDriver.pipeInput(this.recordFactory.create(TOPIC, "1", "2", 15L));
        topologyTestDriver.pipeInput(this.recordFactory.create(TOPIC, "1", "3", 600L));
        topologyTestDriver.pipeInput(this.recordFactory.create(TOPIC, "2", "1", 600L));
        topologyTestDriver.pipeInput(this.recordFactory.create(TOPIC, "2", "2", 599L));
    }
}
