/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source.assignor;

import io.mantisrx.connector.kafka.source.assignor.StaticPartitionAssignor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StaticPartitionAssignorImpl
implements StaticPartitionAssignor {
    private static final Logger LOGGER = LoggerFactory.getLogger(StaticPartitionAssignorImpl.class);

    @Override
    public List<TopicPartition> assignPartitionsToConsumer(int consumerIndex, Map<String, Integer> topicPartitionCounts, int totalNumConsumers) {
        Objects.requireNonNull(topicPartitionCounts, "TopicPartitionCount Map cannot be null");
        if (consumerIndex < 0) {
            throw new IllegalArgumentException("Consumer Index cannot be negative " + consumerIndex);
        }
        if (totalNumConsumers < 0) {
            throw new IllegalArgumentException("Total Number of consumers cannot be negative " + totalNumConsumers);
        }
        if (consumerIndex >= totalNumConsumers) {
            throw new IllegalArgumentException("Consumer Index " + consumerIndex + " cannot be greater than or equal to Total Number of consumers " + totalNumConsumers);
        }
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        int currConsumer = 0;
        for (Map.Entry<String, Integer> topicPartitionCount : topicPartitionCounts.entrySet()) {
            String topic = topicPartitionCount.getKey();
            Integer numPartitions = topicPartitionCount.getValue();
            if (numPartitions <= 0) {
                LOGGER.warn("Number of partitions is " + numPartitions + " for Topic " + topic + " skipping");
                continue;
            }
            for (int i = 0; i < numPartitions; ++i) {
                if (currConsumer == totalNumConsumers) {
                    currConsumer = 0;
                }
                if (currConsumer == consumerIndex) {
                    topicPartitions.add(new TopicPartition(topic, i));
                }
                ++currConsumer;
            }
        }
        return topicPartitions;
    }
}

