package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.class */
public class KStreamKStreamJoinTest {
    private static final String[] EMPTY = new String[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("left", Consumed.with(Serdes.String(), Serdes.Integer()));
        KStream stream2 = streamsBuilder.stream("right", Consumed.with(Serdes.String(), Serdes.Integer()));
        ConsumerRecordFactory consumerRecordFactory = new ConsumerRecordFactory(new StringSerializer(), new IntegerSerializer());
        stream.join(stream2, (num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            topologyTestDriver.pipeInput(consumerRecordFactory.create("left", "A", (Object) null));
            LogCaptureAppender.unregister(createAndRegister);
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[0] offset=[0]"));
            Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(topologyTestDriver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).join(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i]));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i2 = 0; i2 < 2; i2++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i2]), "a" + iArr[i2]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
                for (int i3 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i3), "B" + i3));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)");
                for (int i4 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i4), "b" + i4));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
                for (int i5 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i5), "C" + i5));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
                for (int i6 = 0; i6 < 2; i6++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i6]), "c" + iArr[i6]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOuterJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).outerJoin(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
                for (int i2 = 0; i2 < 2; i2++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i2]), "a" + iArr[i2]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
                for (int i3 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i3), "B" + i3));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)", "3:B3+null (ts: 0)");
                for (int i4 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i4), "b" + i4));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)", "3:B3+b3 (ts: 0)");
                for (int i5 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i5), "C" + i5));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
                for (int i6 = 0; i6 < 2; i6++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i6]), "c" + iArr[i6]));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+c0 (ts: 0)", "0:B0+c0 (ts: 0)", "0:C0+c0 (ts: 0)", "1:A1+c1 (ts: 0)", "1:B1+c1 (ts: 0)", "1:C1+c1 (ts: 0)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).join(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < 2; i++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i], 0L));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i2 = 0; i2 < 2; i2++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i2]), "a" + iArr[i2], 0L));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
                for (int i3 = 0; i3 < iArr.length; i3++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i3]), "B" + iArr[i3], 1000 + i3));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                long j = 1000 + 100;
                for (int i4 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i4), "b" + i4, j));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
                long j2 = j + 1;
                for (int i5 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i5), "c" + i5, j2));
                }
                theCapturedProcessor.checkAndClearProcessResult("1:B1+c1 (ts: 1101)", "2:B2+c2 (ts: 1101)", "3:B3+c3 (ts: 1101)");
                long j3 = j2 + 1;
                for (int i6 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i6), "d" + i6, j3));
                }
                theCapturedProcessor.checkAndClearProcessResult("2:B2+d2 (ts: 1102)", "3:B3+d3 (ts: 1102)");
                long j4 = j3 + 1;
                for (int i7 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i7), "e" + i7, j4));
                }
                theCapturedProcessor.checkAndClearProcessResult("3:B3+e3 (ts: 1103)");
                long j5 = j4 + 1;
                for (int i8 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i8), "f" + i8, j5));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i9 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i9), "g" + i9, 899L));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                long j6 = 899 + 1;
                for (int i10 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i10), "h" + i10, j6));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+h0 (ts: 1000)");
                long j7 = j6 + 1;
                for (int i11 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i11), "i" + i11, j7));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+i0 (ts: 1000)", "1:B1+i1 (ts: 1001)");
                long j8 = j7 + 1;
                for (int i12 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i12), "j" + i12, j8));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+j0 (ts: 1000)", "1:B1+j1 (ts: 1001)", "2:B2+j2 (ts: 1002)");
                long j9 = j8 + 1;
                for (int i13 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i13), "k" + i13, j9));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:B0+k0 (ts: 1000)", "1:B1+k1 (ts: 1001)", "2:B2+k2 (ts: 1002)", "3:B3+k3 (ts: 1003)");
                for (int i14 = 0; i14 < iArr.length; i14++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(iArr[i14]), "l" + iArr[i14], 2000 + i14));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i15 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i15), "C" + i15, 2100L));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:C0+l0 (ts: 2100)", "1:C1+l1 (ts: 2100)", "2:C2+l2 (ts: 2100)", "3:C3+l3 (ts: 2100)");
                long j10 = 2100 + 1;
                for (int i16 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i16), "D" + i16, j10));
                }
                theCapturedProcessor.checkAndClearProcessResult("1:D1+l1 (ts: 2101)", "2:D2+l2 (ts: 2101)", "3:D3+l3 (ts: 2101)");
                long j11 = j10 + 1;
                for (int i17 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i17), "E" + i17, j11));
                }
                theCapturedProcessor.checkAndClearProcessResult("2:E2+l2 (ts: 2102)", "3:E3+l3 (ts: 2102)");
                long j12 = j11 + 1;
                for (int i18 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i18), "F" + i18, j12));
                }
                theCapturedProcessor.checkAndClearProcessResult("3:F3+l3 (ts: 2103)");
                long j13 = j12 + 1;
                for (int i19 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i19), "G" + i19, j13));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i20 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i20), "H" + i20, 1899L));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                long j14 = 1899 + 1;
                for (int i21 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i21), "I" + i21, j14));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:I0+l0 (ts: 2000)");
                long j15 = j14 + 1;
                for (int i22 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i22), "J" + i22, j15));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:J0+l0 (ts: 2000)", "1:J1+l1 (ts: 2001)");
                long j16 = j15 + 1;
                for (int i23 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i23), "K" + i23, j16));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:K0+l0 (ts: 2000)", "1:K1+l1 (ts: 2001)", "2:K2+l2 (ts: 2002)");
                long j17 = j16 + 1;
                for (int i24 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(i24), "L" + i24, j17));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:L0+l0 (ts: 2000)", "1:L1+l1 (ts: 2001)", "2:L2+l2 (ts: 2002)", "3:L3+l3 (ts: 2003)");
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAsymmetricWindowingAfter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).join(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(0L)).after(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < iArr.length; i++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i], 1000 + i));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i2 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i2), "a" + i2, 999L));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                long j = 999 + 1;
                for (int i3 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i3), "b" + i3, j));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
                long j2 = j + 1;
                for (int i4 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i4), "c" + i4, j2));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+c0 (ts: 1001)", "1:A1+c1 (ts: 1001)");
                long j3 = j2 + 1;
                for (int i5 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i5), "d" + i5, j3));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+d0 (ts: 1002)", "1:A1+d1 (ts: 1002)", "2:A2+d2 (ts: 1002)");
                long j4 = j3 + 1;
                for (int i6 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i6), "e" + i6, j4));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+e0 (ts: 1003)", "1:A1+e1 (ts: 1003)", "2:A2+e2 (ts: 1003)", "3:A3+e3 (ts: 1003)");
                for (int i7 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i7), "f" + i7, 1100L));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+f0 (ts: 1100)", "1:A1+f1 (ts: 1100)", "2:A2+f2 (ts: 1100)", "3:A3+f3 (ts: 1100)");
                long j5 = 1100 + 1;
                for (int i8 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i8), "g" + i8, j5));
                }
                theCapturedProcessor.checkAndClearProcessResult("1:A1+g1 (ts: 1101)", "2:A2+g2 (ts: 1101)", "3:A3+g3 (ts: 1101)");
                long j6 = j5 + 1;
                for (int i9 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i9), "h" + i9, j6));
                }
                theCapturedProcessor.checkAndClearProcessResult("2:A2+h2 (ts: 1102)", "3:A3+h3 (ts: 1102)");
                long j7 = j6 + 1;
                for (int i10 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i10), "i" + i10, j7));
                }
                theCapturedProcessor.checkAndClearProcessResult("3:A3+i3 (ts: 1103)");
                long j8 = j7 + 1;
                for (int i11 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i11), "j" + i11, j8));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testAsymmetricWindowingBefore() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        streamsBuilder.stream("topic1", this.consumed).join(streamsBuilder.stream("topic2", this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(0L)).before(Duration.ofMillis(100L)), Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assert.assertEquals(1L, copartitionGroups.size());
        Assert.assertEquals(new HashSet(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                MockProcessor theCapturedProcessor = mockProcessorSupplier.theCapturedProcessor();
                for (int i = 0; i < iArr.length; i++) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic1", Integer.valueOf(iArr[i]), "A" + iArr[i], 1000 + i));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                for (int i2 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i2), "a" + i2, 899L));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                long j = 899 + 1;
                for (int i3 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i3), "b" + i3, j));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+b0 (ts: 1000)");
                long j2 = j + 1;
                for (int i4 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i4), "c" + i4, j2));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+c0 (ts: 1000)", "1:A1+c1 (ts: 1001)");
                long j3 = j2 + 1;
                for (int i5 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i5), "d" + i5, j3));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+d0 (ts: 1000)", "1:A1+d1 (ts: 1001)", "2:A2+d2 (ts: 1002)");
                long j4 = j3 + 1;
                for (int i6 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i6), "e" + i6, j4));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+e0 (ts: 1000)", "1:A1+e1 (ts: 1001)", "2:A2+e2 (ts: 1002)", "3:A3+e3 (ts: 1003)");
                for (int i7 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i7), "f" + i7, 1000L));
                }
                theCapturedProcessor.checkAndClearProcessResult("0:A0+f0 (ts: 1000)", "1:A1+f1 (ts: 1001)", "2:A2+f2 (ts: 1002)", "3:A3+f3 (ts: 1003)");
                long j5 = 1000 + 1;
                for (int i8 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i8), "g" + i8, j5));
                }
                theCapturedProcessor.checkAndClearProcessResult("1:A1+g1 (ts: 1001)", "2:A2+g2 (ts: 1002)", "3:A3+g3 (ts: 1003)");
                long j6 = j5 + 1;
                for (int i9 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i9), "h" + i9, j6));
                }
                theCapturedProcessor.checkAndClearProcessResult("2:A2+h2 (ts: 1002)", "3:A3+h3 (ts: 1003)");
                long j7 = j6 + 1;
                for (int i10 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i10), "i" + i10, j7));
                }
                theCapturedProcessor.checkAndClearProcessResult("3:A3+i3 (ts: 1003)");
                long j8 = j7 + 1;
                for (int i11 : iArr) {
                    topologyTestDriver.pipeInput(this.recordFactory.create("topic2", Integer.valueOf(i11), "j" + i11, j8));
                }
                theCapturedProcessor.checkAndClearProcessResult(EMPTY);
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }
}
