/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaTuple;
import org.apache.storm.kafka.spout.KafkaTupleListener;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpout<K, V>
extends BaseRichSpout {
    private static final long serialVersionUID = 4151921085047987154L;
    public static final long TIMER_DELAY_MS = 500L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    protected SpoutOutputCollector collector;
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    private final ConsumerFactory<K, V> kafkaConsumerFactory;
    private final TopicAssigner topicAssigner;
    private transient Consumer<K, V> consumer;
    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    private transient KafkaSpoutRetryService retryService;
    private transient KafkaTupleListener tupleListener;
    private transient Timer commitTimer;
    private transient Map<TopicPartition, OffsetManager> offsetManagers;
    private transient Set<KafkaSpoutMessageId> emitted;
    private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
    private transient Timer refreshAssignmentTimer;
    private transient TopologyContext context;
    private transient CommitMetadataManager commitMetadataManager;
    private transient KafkaOffsetMetric<K, V> kafkaOffsetMetric;
    private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;

    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this(kafkaSpoutConfig, new ConsumerFactoryDefault(), new TopicAssigner());
    }

    @VisibleForTesting
    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.topicAssigner = topicAssigner;
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.context = context;
        this.collector = collector;
        this.firstPollOffsetStrategy = this.kafkaSpoutConfig.getFirstPollOffsetStrategy();
        this.retryService = this.kafkaSpoutConfig.getRetryService();
        this.tupleListener = this.kafkaSpoutConfig.getTupleListener();
        if (this.kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
            this.commitTimer = new Timer(500L, this.kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
        }
        this.refreshAssignmentTimer = new Timer(500L, this.kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
        this.offsetManagers = new HashMap<TopicPartition, OffsetManager>();
        this.emitted = new HashSet<KafkaSpoutMessageId>();
        this.waitingToEmit = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
        this.commitMetadataManager = new CommitMetadataManager(context, this.kafkaSpoutConfig.getProcessingGuarantee());
        this.rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
        this.consumer = this.kafkaConsumerFactory.createConsumer(this.kafkaSpoutConfig.getKafkaProps());
        this.tupleListener.open(conf, context);
        if (this.canRegisterMetrics()) {
            this.registerMetric();
        }
        LOG.info("Kafka Spout opened with the following configuration: {}", (Object)this.kafkaSpoutConfig);
    }

    private void registerMetric() {
        LOG.info("Registering Spout Metrics");
        this.kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(this.offsetManagers), () -> this.consumer);
        this.context.registerMetric("kafkaOffset", this.kafkaOffsetMetric, this.kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
    }

    private boolean canRegisterMetrics() {
        try {
            KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
        }
        catch (NoSuchMethodException e) {
            LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
            return false;
        }
        return true;
    }

    private boolean isAtLeastOnceProcessing() {
        return this.kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
    }

    public void nextTuple() {
        try {
            PollablePartitionsInfo pollablePartitionsInfo;
            if (this.refreshAssignmentTimer.isExpiredResetOnTrue()) {
                this.refreshAssignment();
            }
            if (this.commitTimer != null && this.commitTimer.isExpiredResetOnTrue()) {
                if (this.isAtLeastOnceProcessing()) {
                    this.commitOffsetsForAckedTuples();
                } else if (this.kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE) {
                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = this.createFetchedOffsetsMetadata(this.consumer.assignment());
                    this.consumer.commitAsync(offsetsToCommit, null);
                    LOG.debug("Committed offsets {} to Kafka", (Object)offsetsToCommit);
                }
            }
            if ((pollablePartitionsInfo = this.getPollablePartitionsInfo()).shouldPoll()) {
                try {
                    this.setWaitingToEmit(this.pollKafkaBroker(pollablePartitionsInfo));
                }
                catch (RetriableException e) {
                    LOG.error("Failed to poll from kafka.", e);
                }
            }
            this.emitIfWaitingNotEmitted();
        }
        catch (InterruptException e) {
            this.throwKafkaConsumerInterruptedException();
        }
    }

    private void throwKafkaConsumerInterruptedException() {
        throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
    }

    private PollablePartitionsInfo getPollablePartitionsInfo() {
        if (this.isWaitingToEmit()) {
            LOG.debug("Not polling. Tuples waiting to be emitted.");
            return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap());
        }
        Set<TopicPartition> assignment = this.consumer.assignment();
        if (!this.isAtLeastOnceProcessing()) {
            return new PollablePartitionsInfo(assignment, Collections.emptyMap());
        }
        Map<TopicPartition, Long> earliestRetriableOffsets = this.retryService.earliestRetriableOffsets();
        HashSet<TopicPartition> pollablePartitions = new HashSet<TopicPartition>();
        int maxUncommittedOffsets = this.kafkaSpoutConfig.getMaxUncommittedOffsets();
        for (TopicPartition tp : assignment) {
            OffsetManager offsetManager = this.offsetManagers.get(tp);
            int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
            if (numUncommittedOffsets < maxUncommittedOffsets) {
                pollablePartitions.add(tp);
                continue;
            }
            long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
            Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
            if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
                pollablePartitions.add(tp);
                continue;
            }
            LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp, numUncommittedOffsets, maxUncommittedOffsets);
        }
        return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
    }

    private boolean isWaitingToEmit() {
        return this.waitingToEmit.values().stream().anyMatch(list -> !list.isEmpty());
    }

    private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
        for (TopicPartition tp : consumerRecords.partitions()) {
            this.waitingToEmit.put(tp, new LinkedList<ConsumerRecord<K, V>>(consumerRecords.records(tp)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
        this.doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
        HashSet<TopicPartition> pausedPartitions = new HashSet<TopicPartition>(this.consumer.assignment());
        pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
        try {
            this.consumer.pause(pausedPartitions);
            ConsumerRecords<K, V> consumerRecords = this.consumer.poll(this.kafkaSpoutConfig.getPollTimeoutMs());
            this.ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
            int numPolledRecords = consumerRecords.count();
            LOG.debug("Polled [{}] records from Kafka", (Object)numPolledRecords);
            if (this.kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = this.createFetchedOffsetsMetadata(this.consumer.assignment());
                this.consumer.commitSync(offsetsToCommit);
                LOG.debug("Committed offsets {} to Kafka", (Object)offsetsToCommit);
            }
            ConsumerRecords<K, V> consumerRecords2 = consumerRecords;
            return consumerRecords2;
        }
        finally {
            this.consumer.resume(pausedPartitions);
        }
    }

    private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
        for (Map.Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
            this.consumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
        }
    }

    private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets, ConsumerRecords<K, V> consumerRecords) {
        for (Map.Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
            long earliestReceivedOffset;
            TopicPartition tp = entry.getKey();
            List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
            if (records.isEmpty()) continue;
            ConsumerRecord<K, V> record = records.get(0);
            long seekOffset = entry.getValue();
            if (seekOffset >= (earliestReceivedOffset = record.offset())) continue;
            for (long i = seekOffset; i < earliestReceivedOffset; ++i) {
                KafkaSpoutMessageId msgId = this.retryService.getMessageId(tp, i);
                if (this.offsetManagers.get(tp).contains(msgId) || this.emitted.contains(msgId)) continue;
                LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", (Object)i, (Object)tp);
                this.retryService.remove(msgId);
                this.emitted.add(msgId);
                this.ack(msgId);
            }
        }
    }

    private void emitIfWaitingNotEmitted() {
        Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = this.waitingToEmit.values().iterator();
        block0: while (waitingToEmitIter.hasNext()) {
            List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
            while (!waitingToEmitForTp.isEmpty()) {
                boolean emittedTuple = this.emitOrRetryTuple(waitingToEmitForTp.remove(0));
                if (!emittedTuple) continue;
                break block0;
            }
            waitingToEmitIter.remove();
        }
    }

    private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        KafkaSpoutMessageId msgId = this.retryService.getMessageId(tp, record.offset());
        if (this.offsetManagers.containsKey(tp) && this.offsetManagers.get(tp).contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", (Object)record);
        } else if (this.emitted.contains(msgId)) {
            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", (Object)record);
        } else {
            List<Object> tuple = this.kafkaSpoutConfig.getTranslator().apply(record);
            if (this.isEmitTuple(tuple)) {
                boolean isScheduled = this.retryService.isScheduled(msgId);
                if (!isScheduled || this.retryService.isReady(msgId)) {
                    String stream;
                    String string = stream = tuple instanceof KafkaTuple ? ((KafkaTuple)((Object)tuple)).getStream() : "default";
                    if (!this.isAtLeastOnceProcessing()) {
                        if (this.kafkaSpoutConfig.isTupleTrackingEnforced()) {
                            this.collector.emit(stream, tuple, (Object)msgId);
                            LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                        } else {
                            this.collector.emit(stream, tuple);
                            LOG.trace("Emitted tuple [{}] for record [{}]", (Object)tuple, (Object)record);
                        }
                    } else {
                        this.emitted.add(msgId);
                        this.offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                        if (isScheduled) {
                            this.retryService.remove(msgId);
                        }
                        this.collector.emit(stream, tuple, (Object)msgId);
                        this.tupleListener.onEmit(tuple, msgId);
                        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                    }
                    return true;
                }
            } else {
                LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", (Object)record);
                if (this.isAtLeastOnceProcessing()) {
                    msgId.setNullTuple(true);
                    this.offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                    this.ack(msgId);
                }
            }
        }
        return false;
    }

    private boolean isEmitTuple(List<Object> tuple) {
        return tuple != null || this.kafkaSpoutConfig.isEmitNullTuples();
    }

    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition tp : assignedPartitions) {
            offsetsToCommit.put(tp, new OffsetAndMetadata(this.consumer.position(tp), this.commitMetadataManager.getCommitMetadata()));
        }
        return offsetsToCommit;
    }

    private void commitOffsetsForAckedTuples() {
        HashMap<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (Map.Entry<TopicPartition, OffsetManager> entry : this.offsetManagers.entrySet()) {
            OffsetAndMetadata nextCommitOffset = entry.getValue().findNextCommitOffset(this.commitMetadataManager.getCommitMetadata());
            if (nextCommitOffset == null) continue;
            nextCommitOffsets.put(entry.getKey(), nextCommitOffset);
        }
        if (!nextCommitOffsets.isEmpty()) {
            this.consumer.commitSync(nextCommitOffsets);
            LOG.debug("Offsets successfully committed to Kafka [{}]", (Object)nextCommitOffsets);
            for (Map.Entry<TopicPartition, OffsetManager> entry : nextCommitOffsets.entrySet()) {
                List<ConsumerRecord<K, V>> waitingToEmitForTp;
                long committedOffset;
                TopicPartition tp = entry.getKey();
                long position = this.consumer.position(tp);
                if (position < (committedOffset = ((OffsetAndMetadata)((Object)entry.getValue())).offset())) {
                    LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]", (Object)position, (Object)committedOffset);
                    this.consumer.seek(tp, committedOffset);
                }
                if ((waitingToEmitForTp = this.waitingToEmit.get(tp)) != null) {
                    this.waitingToEmit.put(tp, waitingToEmitForTp.stream().filter(record -> record.offset() >= committedOffset).collect(Collectors.toCollection(LinkedList::new)));
                }
                OffsetManager offsetManager = this.offsetManagers.get(tp);
                offsetManager.commit((OffsetAndMetadata)((Object)entry.getValue()));
                LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", (Object)offsetManager.getNumUncommittedOffsets(), (Object)tp);
            }
        } else {
            LOG.trace("No offsets to commit. {}", (Object)this);
        }
    }

    public void ack(Object messageId) {
        if (!this.isAtLeastOnceProcessing()) {
            return;
        }
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
        if (msgId.isNullTuple()) {
            this.offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
            LOG.debug("Received direct ack for message [{}], associated with null tuple", (Object)msgId);
            this.tupleListener.onAck(msgId);
            return;
        }
        if (!this.emitted.contains(msgId)) {
            LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that came from a topic-partition that this consumer group instance is no longer tracking due to rebalance/partition reassignment. No action taken.", (Object)msgId);
        } else {
            Validate.isTrue(!this.retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked. This should never occur barring errors in the RetryService implementation or the spout code.");
            this.offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
            this.emitted.remove(msgId);
        }
        this.tupleListener.onAck(msgId);
    }

    public void fail(Object messageId) {
        if (!this.isAtLeastOnceProcessing()) {
            return;
        }
        KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageId;
        if (!this.emitted.contains(msgId)) {
            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", (Object)msgId);
            return;
        }
        Validate.isTrue(!this.retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed. This should never occur barring errors in the RetryService implementation or the spout code.");
        msgId.incrementNumFails();
        if (!this.retryService.schedule(msgId)) {
            LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", (Object)msgId);
            this.tupleListener.onMaxRetryReached(msgId);
            this.ack(msgId);
        } else {
            this.tupleListener.onRetry(msgId);
            this.emitted.remove(msgId);
        }
    }

    public void activate() {
        try {
            this.refreshAssignment();
        }
        catch (InterruptException e) {
            this.throwKafkaConsumerInterruptedException();
        }
    }

    private void refreshAssignment() {
        Set<TopicPartition> allPartitions = this.kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(this.consumer);
        ArrayList<TopicPartition> allPartitionsSorted = new ArrayList<TopicPartition>(allPartitions);
        Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
        Set<TopicPartition> assignedPartitions = this.kafkaSpoutConfig.getTopicPartitioner().getPartitionsForThisTask(allPartitionsSorted, this.context);
        this.topicAssigner.assignPartitions(this.consumer, assignedPartitions, this.rebalanceListener);
    }

    public void deactivate() {
        try {
            this.commitIfNecessary();
        }
        catch (InterruptException e) {
            this.throwKafkaConsumerInterruptedException();
        }
    }

    public void close() {
        try {
            this.shutdown();
        }
        catch (InterruptException e) {
            this.throwKafkaConsumerInterruptedException();
        }
    }

    private void commitIfNecessary() {
        if (this.isAtLeastOnceProcessing()) {
            this.commitOffsetsForAckedTuples();
        }
    }

    private void shutdown() {
        try {
            this.commitIfNecessary();
        }
        finally {
            this.consumer.close();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        RecordTranslator translator = this.kafkaSpoutConfig.getTranslator();
        for (String stream : translator.streams()) {
            declarer.declareStream(stream, translator.getFieldsFor(stream));
        }
    }

    public String toString() {
        return "KafkaSpout{offsetManagers =" + this.offsetManagers + ", emitted=" + this.emitted + "}";
    }

    public Map<String, Object> getComponentConfiguration() {
        HashMap<String, Object> configuration = super.getComponentConfiguration();
        if (configuration == null) {
            configuration = new HashMap<String, Object>();
        }
        String configKeyPrefix = "config.";
        configuration.put(configKeyPrefix + "topics", this.getTopicsString());
        configuration.put(configKeyPrefix + "groupid", this.kafkaSpoutConfig.getConsumerGroupId());
        for (Map.Entry<String, Object> conf : this.kafkaSpoutConfig.getKafkaProps().entrySet()) {
            if (conf.getValue() != null && this.isPrimitiveOrWrapper(conf.getValue().getClass())) {
                configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
                continue;
            }
            LOG.debug("Dropping Kafka prop '{}' from component configuration", (Object)conf.getKey());
        }
        return configuration;
    }

    private boolean isPrimitiveOrWrapper(Class<?> type) {
        if (type == null) {
            return false;
        }
        return type.isPrimitive() || this.isWrapper(type);
    }

    private boolean isWrapper(Class<?> type) {
        return type == Double.class || type == Float.class || type == Long.class || type == Integer.class || type == Short.class || type == Character.class || type == Byte.class || type == Boolean.class || type == String.class;
    }

    private String getTopicsString() {
        return this.kafkaSpoutConfig.getTopicFilter().getTopicsString();
    }

    @VisibleForTesting
    KafkaOffsetMetric<K, V> getKafkaOffsetMetric() {
        return this.kafkaOffsetMetric;
    }

    private static class PollablePartitionsInfo {
        private final Set<TopicPartition> pollablePartitions;
        private final Map<TopicPartition, Long> pollableEarliestRetriableOffsets;

        PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
            this.pollablePartitions = pollablePartitions;
            this.pollableEarliestRetriableOffsets = earliestRetriableOffsets.entrySet().stream().filter(entry -> pollablePartitions.contains(entry.getKey())).collect(Collectors.toMap(entry -> (TopicPartition)entry.getKey(), entry -> (Long)entry.getValue()));
        }

        public boolean shouldPoll() {
            return !this.pollablePartitions.isEmpty();
        }
    }

    private class KafkaSpoutConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private Collection<TopicPartition> previousAssignment = new HashSet<TopicPartition>();

        private KafkaSpoutConsumerRebalanceListener() {
        }

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.previousAssignment = partitions;
            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.consumer, partitions);
            if (KafkaSpout.this.isAtLeastOnceProcessing()) {
                KafkaSpout.this.commitOffsetsForAckedTuples();
            }
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", KafkaSpout.this.context.getThisTaskId(), KafkaSpout.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaSpout.this.consumer, partitions);
            this.initialize(partitions);
            KafkaSpout.this.tupleListener.onPartitionsReassigned(partitions);
        }

        private void initialize(Collection<TopicPartition> partitions) {
            if (KafkaSpout.this.isAtLeastOnceProcessing()) {
                KafkaSpout.this.offsetManagers.keySet().retainAll(partitions);
                KafkaSpout.this.retryService.retainAll(partitions);
                KafkaSpout.this.emitted.removeIf(msgId -> !partitions.contains(msgId.getTopicPartition()));
            }
            KafkaSpout.this.waitingToEmit.keySet().retainAll(partitions);
            HashSet<TopicPartition> newPartitions = new HashSet<TopicPartition>(partitions);
            newPartitions.removeAll(this.previousAssignment);
            for (TopicPartition newTp : newPartitions) {
                OffsetAndMetadata committedOffset = KafkaSpout.this.consumer.committed(newTp);
                long fetchOffset = this.doSeek(newTp, committedOffset);
                LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]", new Object[]{fetchOffset, newTp, KafkaSpout.this.firstPollOffsetStrategy, committedOffset});
                if (!KafkaSpout.this.isAtLeastOnceProcessing() || KafkaSpout.this.offsetManagers.containsKey(newTp)) continue;
                KafkaSpout.this.offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
            }
            LOG.info("Initialization complete");
        }

        private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
            LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]", new Object[]{newTp, KafkaSpout.this.firstPollOffsetStrategy, committedOffset});
            if (committedOffset != null) {
                if (KafkaSpout.this.commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(KafkaSpout.this.offsetManagers))) {
                    KafkaSpout.this.consumer.seek(newTp, committedOffset.offset());
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)FirstPollOffsetStrategy.EARLIEST)) {
                    KafkaSpout.this.consumer.seekToBeginning(Collections.singleton(newTp));
                } else if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)FirstPollOffsetStrategy.LATEST)) {
                    KafkaSpout.this.consumer.seekToEnd(Collections.singleton(newTp));
                } else {
                    KafkaSpout.this.consumer.seek(newTp, committedOffset.offset());
                }
            } else if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)FirstPollOffsetStrategy.EARLIEST) || KafkaSpout.this.firstPollOffsetStrategy.equals((Object)FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)) {
                KafkaSpout.this.consumer.seekToBeginning(Collections.singleton(newTp));
            } else if (KafkaSpout.this.firstPollOffsetStrategy.equals((Object)FirstPollOffsetStrategy.LATEST) || KafkaSpout.this.firstPollOffsetStrategy.equals((Object)FirstPollOffsetStrategy.UNCOMMITTED_LATEST)) {
                KafkaSpout.this.consumer.seekToEnd(Collections.singleton(newTp));
            }
            return KafkaSpout.this.consumer.position(newTp);
        }
    }
}

