package io.confluent.parallelconsumer;

import com.google.common.truth.Truth;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniLists;

@Timeout(60000)
/* loaded from: input_file:io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.class */
class MockConsumerTestWithSaslAuthenticationException {
    private static final Logger log = LoggerFactory.getLogger(MockConsumerTestWithSaslAuthenticationException.class);
    private final String topic = MockConsumerTestWithSaslAuthenticationException.class.getSimpleName();

    MockConsumerTestWithSaslAuthenticationException() {
    }

    @Test
    void mockConsumer() {
        final AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() + 20000);
        Consumer consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) { // from class: io.confluent.parallelconsumer.MockConsumerTestWithSaslAuthenticationException.1
            public synchronized ConsumerRecords<String, String> poll(Duration duration) {
                if (System.currentTimeMillis() >= atomicLong.get()) {
                    return super.poll(duration);
                }
                MockConsumerTestWithSaslAuthenticationException.log.info("Mocking failure before 20 seconds");
                throw new SaslAuthenticationException("Invalid username or password");
            }

            public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
                if (System.currentTimeMillis() < atomicLong.get()) {
                    throw new SaslAuthenticationException("Invalid username or password");
                }
                super.commitSync(map);
            }
        };
        HashMap hashMap = new HashMap();
        TopicPartition topicPartition = new TopicPartition(this.topic, 0);
        hashMap.put(topicPartition, 0L);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().consumer(consumer).saslAuthenticationRetryTimeout(Duration.ofSeconds(25L)).build());
        parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
        consumer.rebalance(Collections.singletonList(topicPartition));
        parallelEoSStreamProcessor.onPartitionsAssigned(UniLists.of(topicPartition));
        consumer.updateBeginningOffsets(hashMap);
        addRecords(consumer);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        parallelEoSStreamProcessor.poll(pollContext -> {
            pollContext.forEach(recordContext -> {
                log.warn("Processing: {}", recordContext);
                concurrentLinkedQueue.add(recordContext);
            });
        });
        Awaitility.setDefaultTimeout(Duration.ofSeconds(50L));
        Awaitility.await().untilAsserted(() -> {
            Truth.assertThat(concurrentLinkedQueue).hasSize(3);
        });
        Awaitility.reset();
    }

    private void addRecords(MockConsumer<String, String> mockConsumer) {
        mockConsumer.addRecord(new ConsumerRecord(this.topic, 0, 0L, "key", "value"));
        mockConsumer.addRecord(new ConsumerRecord(this.topic, 0, 1L, "key", "value"));
        mockConsumer.addRecord(new ConsumerRecord(this.topic, 0, 2L, "key", "value"));
    }
}
