/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.connectors;

import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.connectors.KafkaTopicPartitionState;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerThread<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerThread.class);
    private Consumer<String, String> consumer;
    private Properties kafkaConsumerConfig;
    private volatile boolean isOffsetsToCommit = false;
    private volatile boolean isCommitCommenced = false;
    private Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
    private Map<TopicPartition, OffsetAndMetadata> offsetsToSubscribe;
    private List<TopicPartition> topicPartitions;
    private List<KafkaTopicPartitionState> topicPartitionStates;
    private volatile boolean active = true;
    private volatile TaskContext taskContext;
    private String edge;
    private boolean fetchLoopStarted = false;

    public KafkaConsumerThread(Properties kafkaConsumerConfig, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, List<TopicPartition> topicPartitions, List<KafkaTopicPartitionState> topicPartitionStates, TaskContext context, String edge) {
        this.kafkaConsumerConfig = kafkaConsumerConfig;
        this.offsetsToCommit = offsetsToCommit;
        this.topicPartitions = topicPartitions;
        this.topicPartitionStates = topicPartitionStates;
        this.taskContext = context;
        this.edge = edge;
    }

    public int run() {
        if (!this.fetchLoopStarted) {
            LOG.info("starting");
            this.initiateConnection();
            this.commitOffsets();
            if (!this.active) {
                return 0;
            }
            if (this.topicPartitions == null) {
                throw new Error("Topic Partition is not defined");
            }
            this.consumer.assign(this.topicPartitions);
            this.fetchLoopStarted = true;
        }
        int messageCount = 0;
        ConsumerRecords records = this.consumer.poll(100L);
        if (records != null) {
            for (ConsumerRecord record : records) {
                for (KafkaTopicPartitionState topicPartitionState : this.topicPartitionStates) {
                    List partitionRecords = records.records(topicPartitionState.getTopicPartition());
                    for (ConsumerRecord record2 : partitionRecords) {
                        String value = (String)record2.value();
                        this.emitRecord(value, topicPartitionState, record2.offset());
                        ++messageCount;
                    }
                }
            }
        }
        return messageCount;
    }

    public void initiateConnection() {
        if (this.consumer == null) {
            this.consumer = new KafkaConsumer(this.kafkaConsumerConfig);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitOffsets() {
        KafkaConsumerThread kafkaConsumerThread = this;
        synchronized (kafkaConsumerThread) {
            if (!this.isOffsetsToCommit) {
                return;
            }
        }
        if (this.isCommitCommenced) {
            return;
        }
        this.isCommitCommenced = true;
        this.isOffsetsToCommit = false;
        this.consumer.commitAsync(this.offsetsToCommit, new OffsetCommitCallback(){

            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                KafkaConsumerThread.this.isCommitCommenced = false;
                KafkaConsumerThread.this.setOffsetsToSubscribe(offsets);
            }
        });
    }

    public synchronized void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
        this.offsetsToCommit = offsetsToCommit;
        this.isOffsetsToCommit = true;
    }

    public void setSeek() {
        this.initiateConnection();
        for (TopicPartition topicPartition : this.offsetsToSubscribe.keySet()) {
            this.consumer.seek(topicPartition, this.offsetsToSubscribe.get(topicPartition).offset());
        }
    }

    public void setOffsetsToSubscribe(Map<TopicPartition, OffsetAndMetadata> committedOffset) {
        this.offsetsToSubscribe = committedOffset;
    }

    public void setSeekToBeginning() {
        this.initiateConnection();
        this.consumer.seekToBeginning(this.topicPartitions);
    }

    public void assignPartitions() {
        this.initiateConnection();
        this.consumer.assign(this.topicPartitions);
    }

    public boolean isActive() {
        return this.active;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    public void emitRecord(String value, KafkaTopicPartitionState tps, Long offset) {
        LOG.info("emitting record {} from the partition {}", (Object)value, (Object)offset);
        tps.setPositionOffset(offset);
        this.offsetsToCommit.put(tps.getTopicPartition(), new OffsetAndMetadata(offset.longValue()));
        this.taskContext.write(this.edge, (Object)value);
    }

    public Map<TopicPartition, OffsetAndMetadata> getOffsetsToSubscribe() {
        return this.offsetsToSubscribe;
    }
}

