/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.trident;

import java.io.Serializable;
import java.util.Collection;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTopicPartitionRegistry;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTridentSpoutManager<K, V>
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
    private transient KafkaConsumer<K, V> kafkaConsumer;
    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    private Fields fields;

    public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
        this.kafkaSpoutConfig = kafkaSpoutConfig;
        this.fields = this.getFields();
        LOG.debug("Created {}", (Object)this.toString());
    }

    KafkaConsumer<K, V> createAndSubscribeKafkaConsumer(TopologyContext context) {
        this.kafkaConsumer = new KafkaConsumer(this.kafkaSpoutConfig.getKafkaProps());
        this.kafkaSpoutConfig.getSubscription().subscribe(this.kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
        return this.kafkaConsumer;
    }

    KafkaConsumer<K, V> getKafkaConsumer() {
        return this.kafkaConsumer;
    }

    Set<TopicPartition> getTopicPartitions() {
        return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
    }

    final Fields getFields() {
        if (this.fields == null) {
            RecordTranslator<K, V> translator = this.kafkaSpoutConfig.getTranslator();
            Fields fs = null;
            for (String stream : translator.streams()) {
                if (fs == null) {
                    fs = translator.getFieldsFor(stream);
                    continue;
                }
                if (fs.equals((Object)translator.getFieldsFor(stream))) continue;
                throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
            }
            this.fields = fs;
        }
        LOG.debug("OutputFields = {}", (Object)this.fields);
        return this.fields;
    }

    KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {
        return this.kafkaSpoutConfig;
    }

    public final String toString() {
        return super.toString() + "{kafkaConsumer=" + this.kafkaConsumer + ", kafkaSpoutConfig=" + this.kafkaSpoutConfig + '}';
    }

    private class KafkaSpoutConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private KafkaSpoutConsumerRebalanceListener() {
        }

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", KafkaTridentSpoutManager.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaTridentSpoutManager.this.kafkaConsumer, partitions);
            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", KafkaTridentSpoutManager.this.kafkaSpoutConfig.getConsumerGroupId(), KafkaTridentSpoutManager.this.kafkaConsumer, partitions);
        }
    }
}

