/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.connectors;

import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.nodes.BaseSink;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.connectors.KafkaPartitionFinder;
import edu.iu.dsc.tws.connectors.KafkaTopicDescription;
import edu.iu.dsc.tws.connectors.config.KafkaConsumerConfig;
import edu.iu.dsc.tws.connectors.config.KafkaProducerConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TwsKafkaProducer<T>
extends BaseSink {
    private static final long serialVersionUID = -264264120110286749L;
    private static Logger log = LoggerFactory.getLogger(TwsKafkaProducer.class);
    private Properties kafkaConfigs;
    private int myIndex;
    private int worldSize;
    private List<String> listOfTopics = null;
    private Producer<String, String> producer;
    private KafkaPartitionFinder kafkaPartitionFinder;
    private KafkaTopicDescription topicDescription;
    private List<TopicPartition> topicPartitions;
    private Properties simpleKafkaConfig;

    public boolean execute(IMessage message) {
        log.info("Recieved message {}", message.getContent());
        if (this.topicPartitions.isEmpty()) {
            log.info("No partition found for given topic(s)");
        } else {
            for (TopicPartition topicPartition : this.topicPartitions) {
                log.info("Producing to kafka, Message : {} , Topic : {}, Partition : {}", new Object[]{message.getContent(), topicPartition.topic(), topicPartition.partition()});
                this.producer.send(new ProducerRecord(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), (Object)message.getContent().toString(), (Object)message.getContent().toString()));
            }
        }
        return true;
    }

    public void prepare(Config cfg, TaskContext context) {
        super.prepare(cfg, context);
        this.myIndex = cfg.getIntegerValue("twister2.container.id", 0);
        this.worldSize = context.getParallelism();
        log.info("myID : {} , worldSize : {} ", (Object)this.myIndex, (Object)this.worldSize);
        this.topicDescription = new KafkaTopicDescription(this.listOfTopics);
        this.kafkaPartitionFinder = new KafkaPartitionFinder(this.simpleKafkaConfig, this.worldSize, this.myIndex, this.topicDescription);
        this.topicPartitions = this.kafkaPartitionFinder.getRelevantPartitions();
        this.producer = new KafkaProducer(this.kafkaConfigs);
    }

    public TwsKafkaProducer(List<String> topics, List<String> servers) {
        this.kafkaConfigs = this.createKafkaConfig(servers);
        this.listOfTopics = topics;
        this.simpleKafkaConfig = KafkaConsumerConfig.getSimpleKafkaConsumer(servers);
    }

    public TwsKafkaProducer(String singletopic, List<String> servers) {
        this.kafkaConfigs = this.createKafkaConfig(servers);
        this.listOfTopics = new ArrayList<String>();
        this.listOfTopics.add(singletopic);
        this.simpleKafkaConfig = KafkaConsumerConfig.getSimpleKafkaConsumer(servers);
    }

    private Properties createKafkaConfig(List<String> servers) {
        return KafkaProducerConfig.getConfig(servers);
    }

    public Properties setProperty(Properties newProps) {
        this.kafkaConfigs = KafkaProducerConfig.setProps(this.kafkaConfigs, newProps);
        return this.kafkaConfigs;
    }

    public Properties getKafkaConfigs() {
        return this.kafkaConfigs;
    }

    public void setKafkaConfigs(Properties kafkaConfigs) {
        this.kafkaConfigs = kafkaConfigs;
    }

    public TwsKafkaProducer() {
    }
}

