package org.apache.kafka.server.log.remote.metadata.storage;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.class */
public class ConsumerTaskTest {
    private final int numMetadataTopicPartitions = 5;
    private final RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(5);
    private final DummyEventHandler handler = new DummyEventHandler();
    private final Set<TopicPartition> remoteLogPartitions = (Set) IntStream.range(0, 5).boxed().map((v0) -> {
        return ConsumerTask.toRemoteLogPartition(v0);
    }).collect(Collectors.toSet());
    private final Uuid topicId = Uuid.randomUuid();
    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
    private ConsumerTask consumerTask;
    private MockConsumer<byte[], byte[]> consumer;
    private Thread thread;

    /* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest$DummyEventHandler.class */
    private static class DummyEventHandler extends RemotePartitionMetadataEventHandler {
        private int metadataCounter;
        private final Map<TopicIdPartition, Boolean> isPartitionInitialized;
        private final Map<TopicIdPartition, Boolean> isPartitionLoaded;
        private final Map<TopicIdPartition, Boolean> isPartitionCleared;

        private DummyEventHandler() {
            this.metadataCounter = 0;
            this.isPartitionInitialized = new HashMap();
            this.isPartitionLoaded = new HashMap();
            this.isPartitionCleared = new HashMap();
        }

        protected void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
            this.metadataCounter++;
        }

        protected void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
        }

        protected void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
        }

        public void clearTopicPartition(TopicIdPartition topicIdPartition) {
            this.isPartitionCleared.put(topicIdPartition, true);
        }

        public void markInitialized(TopicIdPartition topicIdPartition) {
            this.isPartitionInitialized.put(topicIdPartition, true);
        }

        public boolean isInitialized(TopicIdPartition topicIdPartition) {
            return true;
        }

        public void maybeLoadPartition(TopicIdPartition topicIdPartition) {
            this.isPartitionLoaded.put(topicIdPartition, true);
        }
    }

    @BeforeEach
    public void beforeEach() {
        Map map = (Map) this.remoteLogPartitions.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return 0L;
        }));
        this.consumer = (MockConsumer) Mockito.spy(new MockConsumer(OffsetResetStrategy.EARLIEST));
        this.consumer.updateBeginningOffsets(map);
        this.consumerTask = new ConsumerTask(this.handler, this.partitioner, this.consumer, 10L, 300000L, new SystemTime());
        this.thread = new Thread((Runnable) this.consumerTask);
    }

    @AfterEach
    public void afterEach() throws InterruptedException {
        if (this.thread != null) {
            Assertions.assertDoesNotThrow(() -> {
                this.consumerTask.close();
            }, "Close method threw exception");
            this.thread.join(10000L);
            Assertions.assertFalse(this.thread.isAlive(), "Consumer task thread is still alive");
        }
    }

    @Test
    public void testCloseOnNoAssignment() throws InterruptedException {
        this.thread.start();
        Thread.sleep(10L);
        Assertions.assertDoesNotThrow(() -> {
            this.consumerTask.close();
        }, "Close method threw exception");
    }

    @Test
    public void testIdempotentClose() {
        this.thread.start();
        this.consumerTask.close();
        this.consumerTask.close();
    }

    @Test
    public void testUserTopicIdPartitionEquals() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(this.topicId, new TopicPartition("sample", 0));
        ConsumerTask.UserTopicIdPartition userTopicIdPartition = new ConsumerTask.UserTopicIdPartition(topicIdPartition, Integer.valueOf(this.partitioner.metadataPartition(topicIdPartition)));
        ConsumerTask.UserTopicIdPartition userTopicIdPartition2 = new ConsumerTask.UserTopicIdPartition(topicIdPartition, Integer.valueOf(this.partitioner.metadataPartition(topicIdPartition)));
        userTopicIdPartition.isInitialized = true;
        userTopicIdPartition.isAssigned = true;
        Assertions.assertFalse(userTopicIdPartition2.isInitialized);
        Assertions.assertFalse(userTopicIdPartition2.isAssigned);
        Assertions.assertEquals(userTopicIdPartition, userTopicIdPartition2);
    }

    @Test
    public void testAddAssignmentsForPartitions() throws InterruptedException {
        List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3);
        this.consumer.updateEndOffsets((Map) idPartitions.stream().map(topicIdPartition -> {
            return ConsumerTask.toRemoteLogPartition(this.partitioner.metadataPartition(topicIdPartition));
        }).collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return 0L;
        }, (l, l2) -> {
            return l2;
        })));
        this.consumerTask.addAssignmentsForPartitions(new HashSet(idPartitions));
        this.thread.start();
        for (TopicIdPartition topicIdPartition2 : idPartitions) {
            TestUtils.waitForCondition(() -> {
                return this.consumerTask.isUserPartitionAssigned(topicIdPartition2);
            }, "Timed out waiting for " + topicIdPartition2 + " to be assigned");
            Assertions.assertTrue(this.consumerTask.isMetadataPartitionAssigned(this.partitioner.metadataPartition(topicIdPartition2)));
            Assertions.assertTrue(((Boolean) this.handler.isPartitionLoaded.get(topicIdPartition2)).booleanValue());
        }
    }

    @Test
    public void testRemoveAssignmentsForPartitions() throws InterruptedException {
        List<TopicIdPartition> idPartitions = getIdPartitions("sample", 3);
        this.consumer.updateEndOffsets((Map) idPartitions.stream().map(topicIdPartition -> {
            return ConsumerTask.toRemoteLogPartition(this.partitioner.metadataPartition(topicIdPartition));
        }).collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return 0L;
        }, (l, l2) -> {
            return l2;
        })));
        this.consumerTask.addAssignmentsForPartitions(new HashSet(idPartitions));
        this.thread.start();
        TopicIdPartition topicIdPartition2 = idPartitions.get(0);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition2);
        }, "Timed out waiting for " + topicIdPartition2 + " to be assigned");
        addRecord(this.consumer, this.partitioner.metadataPartition(topicIdPartition2), topicIdPartition2, 0L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(this.partitioner.metadataPartition(topicIdPartition2)).isPresent();
        }, "Couldn't read record");
        Set<TopicIdPartition> singleton = Collections.singleton(topicIdPartition2);
        this.consumerTask.removeAssignmentsForPartitions(singleton);
        for (TopicIdPartition topicIdPartition3 : idPartitions) {
            TestUtils.waitForCondition(() -> {
                return singleton.contains(topicIdPartition3) == (!this.consumerTask.isUserPartitionAssigned(topicIdPartition3));
            }, "Timed out waiting for " + topicIdPartition3 + " to be removed");
        }
        for (TopicIdPartition topicIdPartition4 : singleton) {
            TestUtils.waitForCondition(() -> {
                return this.handler.isPartitionCleared.containsKey(topicIdPartition4);
            }, "Timed out waiting for " + topicIdPartition4 + " to be cleared");
        }
    }

    @Test
    public void testConcurrentPartitionAssignments() throws InterruptedException, ExecutionException {
        List<TopicIdPartition> idPartitions = getIdPartitions("sample", 100);
        this.consumer.updateEndOffsets((Map) idPartitions.stream().map(topicIdPartition -> {
            return ConsumerTask.toRemoteLogPartition(this.partitioner.metadataPartition(topicIdPartition));
        }).collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return 0L;
        }, (l, l2) -> {
            return l2;
        })));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            int i = 0;
            Iterator it = idPartitions.iterator();
            while (it.hasNext()) {
                TopicIdPartition topicIdPartition2 = (TopicIdPartition) it.next();
                if (i == 50) {
                    try {
                        countDownLatch.await(1L, TimeUnit.MINUTES);
                    } catch (InterruptedException e) {
                        Assertions.fail(e.getMessage());
                    }
                }
                this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition2));
                i++;
            }
            atomicBoolean.set(true);
        });
        Future<?> submit = Executors.newSingleThreadExecutor().submit(() -> {
            while (!atomicBoolean.get()) {
                try {
                    this.consumerTask.maybeWaitForPartitionAssignments();
                    countDownLatch.countDown();
                } catch (Exception e) {
                    Assertions.fail(e.getMessage());
                    return;
                }
            }
        });
        thread.start();
        thread.join();
        submit.get();
    }

    @Test
    public void testCanProcessRecord() throws InterruptedException {
        Uuid fromString = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
        TopicIdPartition topicIdPartition = new TopicIdPartition(fromString, new TopicPartition("sample", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(fromString, new TopicPartition("sample", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(fromString, new TopicPartition("sample", 2));
        Assertions.assertEquals(this.partitioner.metadataPartition(topicIdPartition), this.partitioner.metadataPartition(topicIdPartition2));
        Assertions.assertEquals(this.partitioner.metadataPartition(topicIdPartition), this.partitioner.metadataPartition(topicIdPartition3));
        int metadataPartition = this.partitioner.metadataPartition(topicIdPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), 0L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition);
        }, "Timed out waiting for " + topicIdPartition + " to be assigned");
        addRecord(this.consumer, metadataPartition, topicIdPartition, 0L);
        addRecord(this.consumer, metadataPartition, topicIdPartition, 1L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L));
        }, "Couldn't read record");
        Assertions.assertEquals(2, this.handler.metadataCounter);
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition2));
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition2);
        }, "Timed out waiting for " + topicIdPartition2 + " to be assigned");
        addRecord(this.consumer, metadataPartition, topicIdPartition2, 2L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L));
        }, "Couldn't read record");
        Assertions.assertEquals(3, this.handler.metadataCounter);
        addRecord(this.consumer, metadataPartition, topicIdPartition3, 3L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L));
        }, "Couldn't read record");
        Assertions.assertEquals(3, this.handler.metadataCounter);
    }

    @Test
    public void testCanReprocessSkippedRecords() throws InterruptedException {
        Uuid fromString = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
        TopicIdPartition topicIdPartition = new TopicIdPartition(fromString, new TopicPartition("sample", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(fromString, new TopicPartition("sample", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(fromString, new TopicPartition("sample", 3));
        Assertions.assertEquals(this.partitioner.metadataPartition(topicIdPartition), this.partitioner.metadataPartition(topicIdPartition2));
        Assertions.assertNotEquals(this.partitioner.metadataPartition(topicIdPartition3), this.partitioner.metadataPartition(topicIdPartition));
        int metadataPartition = this.partitioner.metadataPartition(topicIdPartition);
        int metadataPartition2 = this.partitioner.metadataPartition(topicIdPartition3);
        ((MockConsumer) Mockito.doAnswer(invocationOnMock -> {
            return (!this.consumerTask.isUserPartitionAssigned(topicIdPartition2) || this.consumerTask.isUserPartitionAssigned(topicIdPartition3)) ? invocationOnMock.callRealMethod() : ConsumerRecords.empty();
        }).when(this.consumer)).poll((Duration) ArgumentMatchers.any());
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition2), 0L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition);
        }, "Timed out waiting for " + topicIdPartition + " to be assigned");
        addRecord(this.consumer, metadataPartition, topicIdPartition2, 0L);
        addRecord(this.consumer, metadataPartition, topicIdPartition, 1L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L));
        }, "Couldn't read record");
        Assertions.assertEquals(1, this.handler.metadataCounter);
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition2));
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition2);
        }, "Timed out waiting for " + topicIdPartition2 + " to be assigned");
        HashSet hashSet = new HashSet();
        hashSet.add(topicIdPartition);
        hashSet.add(topicIdPartition3);
        this.consumerTask.addAssignmentsForPartitions(hashSet);
        addRecord(this.consumer, metadataPartition, topicIdPartition2, 0L);
        addRecord(this.consumer, metadataPartition, topicIdPartition, 1L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L));
        }, "Couldn't read record");
        TestUtils.waitForCondition(() -> {
            return this.handler.metadataCounter == 2;
        }, "Couldn't read record");
    }

    @Test
    public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException {
        TopicIdPartition topicIdPartition = getIdPartitions("hello", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(topicIdPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), 2L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition);
        }, "Waiting for " + topicIdPartition + " to be assigned");
        Assertions.assertTrue(this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        Assertions.assertFalse(this.handler.isPartitionInitialized.containsKey(topicIdPartition));
        IntStream.range(0, 5).forEach(i -> {
            addRecord(this.consumer, metadataPartition, topicIdPartition, i);
        });
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(4L));
        }, "Couldn't read record");
        Assertions.assertTrue(((Boolean) this.handler.isPartitionInitialized.get(topicIdPartition)).booleanValue());
    }

    @ParameterizedTest
    @CsvSource({"0, 0", "500, 500"})
    public void testMaybeMarkUserPartitionAsReadyWhenTopicIsEmpty(long j, long j2) throws InterruptedException {
        TopicIdPartition topicIdPartition = getIdPartitions("world", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(topicIdPartition);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), Long.valueOf(j)));
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), Long.valueOf(j2)));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition);
        }, "Waiting for " + topicIdPartition + " to be assigned");
        Assertions.assertTrue(this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        TestUtils.waitForCondition(() -> {
            return this.handler.isPartitionInitialized.containsKey(topicIdPartition);
        }, "should have initialized the partition");
        Assertions.assertFalse(this.consumerTask.readOffsetForMetadataPartition(metadataPartition).isPresent());
    }

    @Test
    public void testConcurrentAccess() throws InterruptedException {
        this.thread.start();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TopicIdPartition topicIdPartition = getIdPartitions("concurrent", 1).get(0);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(this.partitioner.metadataPartition(topicIdPartition)), 0L));
        Thread thread = new Thread(() -> {
            try {
                countDownLatch.await();
                this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
            } catch (InterruptedException e) {
                Assertions.fail("Shouldn't have thrown an exception");
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                countDownLatch.await();
                this.consumerTask.close();
            } catch (InterruptedException e) {
                Assertions.fail("Shouldn't have thrown an exception");
            }
        });
        thread.start();
        thread2.start();
        countDownLatch.countDown();
        thread.join();
        thread2.join();
    }

    @Test
    public void testConsumerShouldNotCloseOnRetriableError() throws InterruptedException {
        TopicIdPartition topicIdPartition = getIdPartitions("world", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(topicIdPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), 1L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition);
        }, "Waiting for " + topicIdPartition + " to be assigned");
        Assertions.assertTrue(this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        this.consumer.setPollException(new LeaderNotAvailableException("leader not available!"));
        addRecord(this.consumer, metadataPartition, topicIdPartition, 0L);
        this.consumer.setPollException(new TimeoutException("Not able to complete the operation within the timeout"));
        addRecord(this.consumer, metadataPartition, topicIdPartition, 1L);
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L));
        }, "Couldn't read record");
        Assertions.assertEquals(2, this.handler.metadataCounter);
    }

    @Test
    public void testConsumerShouldCloseOnNonRetriableError() throws InterruptedException {
        TopicIdPartition topicIdPartition = getIdPartitions("world", 1).get(0);
        int metadataPartition = this.partitioner.metadataPartition(topicIdPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(ConsumerTask.toRemoteLogPartition(metadataPartition), 1L));
        this.consumerTask.addAssignmentsForPartitions(Collections.singleton(topicIdPartition));
        this.thread.start();
        TestUtils.waitForCondition(() -> {
            return this.consumerTask.isUserPartitionAssigned(topicIdPartition);
        }, "Waiting for " + topicIdPartition + " to be assigned");
        Assertions.assertTrue(this.consumerTask.isMetadataPartitionAssigned(metadataPartition));
        this.consumer.setPollException(new AuthorizationException("Unauthorized to read the topic!"));
        TestUtils.waitForCondition(() -> {
            return this.consumer.closed();
        }, "Should close the consume on non-retriable error");
    }

    private void addRecord(MockConsumer<byte[], byte[]> mockConsumer, int i, TopicIdPartition topicIdPartition, long j) {
        mockConsumer.addRecord(new ConsumerRecord("__remote_log_metadata", i, j, (Object) null, this.serde.serialize(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), 0L, 1L, 0L, 0, 0L, 1, Collections.singletonMap(0, 0L)))));
    }

    private List<TopicIdPartition> getIdPartitions(String str, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new TopicIdPartition(this.topicId, new TopicPartition(str, i2)));
        }
        return arrayList;
    }
}
