/*
 * Decompiled with CFR 0.152.
 */
package io.wizzie.bootstrapper.bootstrappers.impl;

import io.wizzie.bootstrapper.bootstrappers.base.ThreadBootstrapper;
import io.wizzie.bootstrapper.builder.Config;
import io.wizzie.bootstrapper.builder.SourceSystem;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBootstrapper
extends ThreadBootstrapper {
    private static final Logger log = LoggerFactory.getLogger(KafkaBootstrapper.class);
    public static final String BOOTSTRAP_TOPICS_CONFIG = "bootstrap.kafka.topics";
    public static final String APPLICATION_ID_CONFIG = "application.id";
    AtomicBoolean closed = new AtomicBoolean(false);
    List<TopicPartition> storePartitions;
    String appId;
    KafkaConsumer<String, String> restoreConsumer;

    @Override
    public void init(Config config) throws IOException {
        Properties consumerConfig = new Properties();
        this.appId = (String)config.get(APPLICATION_ID_CONFIG);
        consumerConfig.put("bootstrap.servers", config.get("bootstrap.servers"));
        consumerConfig.put("auto.offset.reset", "earliest");
        consumerConfig.put("enable.auto.commit", "false");
        consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put("group.id", String.format("kafka-bootstraper-%s-%s", this.appId, UUID.randomUUID().toString()));
        List bootstrapperTopics = (List)config.get(BOOTSTRAP_TOPICS_CONFIG);
        this.storePartitions = new ArrayList<TopicPartition>();
        this.restoreConsumer = new KafkaConsumer(consumerConfig);
        for (String bootstrapperTopic : bootstrapperTopics) {
            this.storePartitions.add(new TopicPartition(bootstrapperTopic, 0));
        }
        this.restoreConsumer.assign(this.storePartitions);
        this.restoreConsumer.seekToEnd(this.storePartitions);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : this.storePartitions) {
            Long endOffset = this.restoreConsumer.position(topicPartition);
            endOffsets.put(topicPartition, endOffset);
        }
        for (Map.Entry entry : endOffsets.entrySet()) {
            long offset = 0L;
            String jsonStreamConfig = null;
            this.restoreConsumer.assign(Collections.singletonList(entry.getKey()));
            this.restoreConsumer.seekToBeginning(Collections.singletonList(entry.getKey()));
            while (offset < (Long)entry.getValue()) {
                for (ConsumerRecord record : this.restoreConsumer.poll(100L).records((TopicPartition)entry.getKey())) {
                    if (record.key() == null || !((String)record.key()).equals(this.appId)) continue;
                    jsonStreamConfig = (String)record.value();
                }
                offset = this.restoreConsumer.position((TopicPartition)entry.getKey());
                log.info("Recover from kafka offset[{}], endOffset[{}]", (Object)offset, entry.getValue());
            }
            if (jsonStreamConfig != null) {
                log.info("Find stream configuration with app id [{}]", (Object)this.appId);
                this.update(new SourceSystem("kafka", ((TopicPartition)entry.getKey()).topic()), jsonStreamConfig);
                continue;
            }
            log.info("Don't find any stream configuration with app id [{}]", (Object)this.appId);
        }
    }

    @Override
    public void run() {
        KafkaBootstrapper.currentThread().setName("KafkaBootstrapper");
        this.restoreConsumer.assign(this.storePartitions);
        while (!this.closed.get()) {
            log.debug("Searching stream configuration with app id [{}]", (Object)this.appId);
            try {
                for (ConsumerRecord record : this.restoreConsumer.poll(5000L)) {
                    System.out.println(record);
                    if (record.key() == null || !((String)record.key()).equals(this.appId)) continue;
                    log.info("Find stream configuration with app id [{}]", (Object)this.appId);
                    this.update(new SourceSystem("kafka", record.topic()), (String)record.value());
                }
            }
            catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
                log.info("Closing restore consumer ...");
                this.restoreConsumer.close(1L, TimeUnit.MINUTES);
            }
        }
    }

    @Override
    public synchronized void close() {
        this.closed.set(true);
        this.restoreConsumer.wakeup();
        log.info("Stop KafkaBootstrapper service");
    }
}

