package io.confluent.parallelconsumer.examples.streams;

import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/examples/streams/StreamsAppTest.class */
public class StreamsAppTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(StreamsAppTest.class);

    /* loaded from: input_file:io/confluent/parallelconsumer/examples/streams/StreamsAppTest$StreamsAppUnderTest.class */
    class StreamsAppUnderTest extends StreamsApp {
        StreamsAppUnderTest() {
        }

        Consumer<String, String> getKafkaConsumer() {
            return StreamsAppTest.this.kcu.consumer;
        }

        Producer<String, String> getKafkaProducer() {
            return StreamsAppTest.this.kcu.createNewProducer(false);
        }

        String getServerConfig() {
            return BrokerIntegrationTest.kafkaContainer.getBootstrapServers();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void test() {
        log.info("Test start");
        ensureTopic(StreamsApp.inputTopic, 1);
        ensureTopic(StreamsApp.outputTopicName, 1);
        StreamsAppUnderTest streamsAppUnderTest = new StreamsAppUnderTest();
        streamsAppUnderTest.run();
        try {
            KafkaProducer createNewProducer = this.kcu.createNewProducer(false);
            try {
                createNewProducer.send(new ProducerRecord(StreamsApp.inputTopic, "a key 1", "a value"));
                createNewProducer.send(new ProducerRecord(StreamsApp.inputTopic, "a key 2", "a value"));
                createNewProducer.send(new ProducerRecord(StreamsApp.inputTopic, "a key 3", "a value"));
                Awaitility.await().untilAsserted(() -> {
                    Assertions.assertThat(streamsAppUnderTest.messageCount.get()).isEqualTo(3);
                });
                if (createNewProducer != null) {
                    createNewProducer.close();
                }
                streamsAppUnderTest.close();
            } catch (Throwable th) {
                if (createNewProducer != null) {
                    try {
                        createNewProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            streamsAppUnderTest.close();
            throw th3;
        }
    }
}
