/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source;

import com.netflix.spectator.api.Registry;
import io.mantisrx.connector.kafka.KafkaAckable;
import io.mantisrx.connector.kafka.KafkaData;
import io.mantisrx.connector.kafka.KafkaDataNotification;
import io.mantisrx.connector.kafka.source.MantisKafkaConsumer;
import io.mantisrx.connector.kafka.source.MantisKafkaConsumerConfig;
import io.mantisrx.connector.kafka.source.MantisKafkaSourceConfig;
import io.mantisrx.connector.kafka.source.TopicPartitionStateManager;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategyOptions;
import io.mantisrx.connector.kafka.source.checkpoint.trigger.CheckpointTrigger;
import io.mantisrx.connector.kafka.source.metrics.ConsumerMetrics;
import io.mantisrx.connector.kafka.source.serde.ParseException;
import io.mantisrx.connector.kafka.source.serde.Parser;
import io.mantisrx.connector.kafka.source.serde.ParserType;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.BooleanParameter;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.record.InvalidRecordException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observables.SyncOnSubscribe;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

public class KafkaSource
implements Source<KafkaAckable> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);
    private final AtomicBoolean done = new AtomicBoolean();
    private final Map<Integer, MantisKafkaConsumer<?>> idToConsumerMap = new HashMap();
    private final Registry registry;
    private final SerializedSubject<KafkaDataNotification, KafkaDataNotification> ackSubject = new SerializedSubject((Subject)PublishSubject.create());

    public KafkaSource(Registry registry) {
        this.registry = registry;
    }

    private Observable<MantisKafkaConsumer<?>> createConsumers(Context context, MantisKafkaSourceConfig kafkaSourceConfig, int totalNumWorkers) {
        ArrayList consumers = new ArrayList();
        for (int i = 0; i < kafkaSourceConfig.getNumConsumerInstances(); ++i) {
            int consumerIndex = context.getWorkerInfo().getWorkerIndex() + totalNumWorkers * i;
            MantisKafkaConsumer<?> mantisKafkaConsumer = new MantisKafkaConsumer.Builder().withKafkaSourceConfig(kafkaSourceConfig).withTotalNumConsumersForJob(totalNumWorkers * kafkaSourceConfig.getNumConsumerInstances()).withContext(context).withConsumerIndex(consumerIndex).withRegistry(this.registry).build();
            this.idToConsumerMap.put(mantisKafkaConsumer.getConsumerId(), mantisKafkaConsumer);
            LOGGER.info("created consumer {}", mantisKafkaConsumer);
            consumers.add(mantisKafkaConsumer);
        }
        return Observable.from(consumers);
    }

    private int getPayloadSize(ConsumerRecord<String, byte[]> record) {
        return ((byte[])record.value()).length + 100;
    }

    private Observable<KafkaAckable> createBackPressuredConsumerObs(MantisKafkaConsumer<?> mantisKafkaConsumer, MantisKafkaSourceConfig kafkaSourceConfig) {
        CheckpointStrategy<?> checkpointStrategy = mantisKafkaConsumer.getStrategy();
        CheckpointTrigger trigger = mantisKafkaConsumer.getTrigger();
        ConsumerMetrics consumerMetrics = mantisKafkaConsumer.getConsumerMetrics();
        TopicPartitionStateManager partitionStateManager = mantisKafkaConsumer.getPartitionStateManager();
        int mantisKafkaConsumerId = mantisKafkaConsumer.getConsumerId();
        SyncOnSubscribe syncOnSubscribe = SyncOnSubscribe.createStateful(() -> {
            ConsumerRecords<String, byte[]> records = mantisKafkaConsumer.poll(kafkaSourceConfig.getConsumerPollTimeoutMs());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("topic listing for consumer {}", mantisKafkaConsumer.listTopics());
            }
            LOGGER.info("consumer subscribed to topic-partitions {}", mantisKafkaConsumer.assignment());
            return records.iterator();
        }, (consumerRecordIterator, observer) -> {
            Iterator it;
            block23: {
                it = consumerRecordIterator;
                Set<TopicPartition> partitions = mantisKafkaConsumer.assignment();
                if (trigger.shouldCheckpoint()) {
                    long startTime = System.currentTimeMillis();
                    Map<TopicPartition, OffsetAndMetadata> checkpoint = partitionStateManager.createCheckpoint(partitions);
                    checkpointStrategy.persistCheckpoint(checkpoint);
                    long now = System.currentTimeMillis();
                    consumerMetrics.recordCheckpointDelay(now - startTime);
                    consumerMetrics.incrementCommitCount();
                    trigger.reset();
                }
                if (!this.done.get()) {
                    try {
                        if (!consumerRecordIterator.hasNext()) {
                            ConsumerRecords<String, byte[]> consumerRecords = mantisKafkaConsumer.poll(kafkaSourceConfig.getConsumerPollTimeoutMs());
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("poll returned {} records", (Object)consumerRecords.count());
                            }
                            it = consumerRecords.iterator();
                        }
                        if (it.hasNext()) {
                            ConsumerRecord m = (ConsumerRecord)it.next();
                            TopicPartition topicPartition = new TopicPartition(m.topic(), m.partition());
                            consumerMetrics.incrementInCount();
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("updating read offset to " + m.offset() + " read " + m.value());
                            }
                            if (m.value() != null) {
                                try {
                                    trigger.update(this.getPayloadSize((ConsumerRecord<String, byte[]>)m));
                                    if (kafkaSourceConfig.getParseMessageInSource().booleanValue()) {
                                        Parser parser = ParserType.parser(kafkaSourceConfig.getMessageParserType()).getParser();
                                        if (parser.canParse((byte[])m.value())) {
                                            Map<String, Object> parsedKafkaValue = parser.parseMessage((byte[])m.value());
                                            KafkaData kafkaData = new KafkaData((ConsumerRecord<String, byte[]>)m, Optional.ofNullable(parsedKafkaValue), Optional.ofNullable(m.key()), mantisKafkaConsumerId);
                                            KafkaAckable ackable = new KafkaAckable(kafkaData, this.ackSubject);
                                            partitionStateManager.recordMessageRead(topicPartition, m.offset());
                                            consumerMetrics.recordReadOffset(topicPartition, m.offset());
                                            observer.onNext((Object)ackable);
                                        } else {
                                            consumerMetrics.incrementParseFailureCount();
                                        }
                                        break block23;
                                    }
                                    KafkaData kafkaData = new KafkaData((ConsumerRecord<String, byte[]>)m, Optional.empty(), Optional.ofNullable(m.key()), mantisKafkaConsumerId);
                                    KafkaAckable ackable = new KafkaAckable(kafkaData, this.ackSubject);
                                    partitionStateManager.recordMessageRead(topicPartition, m.offset());
                                    consumerMetrics.recordReadOffset(topicPartition, m.offset());
                                    observer.onNext((Object)ackable);
                                }
                                catch (ParseException pe) {
                                    consumerMetrics.incrementErrorCount();
                                    LOGGER.warn("failed to parse {}:{} message {}", new Object[]{m.topic(), m.partition(), m.value(), pe});
                                }
                                break block23;
                            }
                            consumerMetrics.incrementKafkaMessageValueNullCount();
                            break block23;
                        }
                        consumerMetrics.incrementWaitForDataCount();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Reached head of partition, waiting for more data");
                        }
                        TimeUnit.MILLISECONDS.sleep(200L);
                    }
                    catch (TimeoutException toe) {
                        consumerMetrics.incrementWaitForDataCount();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Reached head of partition waiting for more data");
                        }
                    }
                    catch (OffsetOutOfRangeException oore) {
                        LOGGER.warn("offsets out of range " + oore.partitions() + " will seek to beginning", (Throwable)oore);
                        Set topicPartitionSet = oore.partitions();
                        for (TopicPartition tp : topicPartitionSet) {
                            LOGGER.info("partition {} consumer position {}", (Object)tp, (Object)mantisKafkaConsumer.position(tp));
                        }
                        mantisKafkaConsumer.seekToBeginning(oore.partitions().toArray(new TopicPartition[oore.partitions().size()]));
                    }
                    catch (InvalidRecordException ire) {
                        consumerMetrics.incrementErrorCount();
                        LOGGER.warn("iterator error with invalid message. message will be dropped " + ire.getMessage());
                    }
                    catch (KafkaException e) {
                        consumerMetrics.incrementErrorCount();
                        LOGGER.warn("Other Kafka exception, message will be dropped. " + e.getMessage());
                    }
                    catch (InterruptedException ie) {
                        LOGGER.error("consumer interrupted", (Throwable)ie);
                        Thread.currentThread().interrupt();
                    }
                    catch (Exception e) {
                        consumerMetrics.incrementErrorCount();
                        LOGGER.warn("caught exception", (Throwable)e);
                    }
                } else {
                    mantisKafkaConsumer.close();
                }
            }
            return it;
        }, consumerRecordIterator -> {
            LOGGER.info("closing Kafka consumer on unsubscribe" + mantisKafkaConsumer.toString());
            mantisKafkaConsumer.close();
        });
        return Observable.create((SyncOnSubscribe)syncOnSubscribe).subscribeOn(Schedulers.newThread()).doOnUnsubscribe(() -> LOGGER.info("consumer {} stopped due to unsubscribe", (Object)mantisKafkaConsumerId)).doOnError(t -> {
            LOGGER.error("consumer {} stopped due to error", (Object)mantisKafkaConsumerId, t);
            consumerMetrics.incrementErrorCount();
        }).doOnTerminate(() -> LOGGER.info("consumer {} terminated", (Object)mantisKafkaConsumerId));
    }

    public Observable<Observable<KafkaAckable>> call(Context context, Index index) {
        int totalNumWorkers = index.getTotalNumWorkers();
        MantisKafkaSourceConfig mantisKafkaSourceConfig = new MantisKafkaSourceConfig(context);
        this.startAckProcessor();
        return Observable.create(child -> {
            Observable<MantisKafkaConsumer<?>> consumers = this.createConsumers(context, mantisKafkaSourceConfig, totalNumWorkers);
            consumers.subscribe(consumer -> {
                Observable<KafkaAckable> mantisKafkaAckableObs = this.createBackPressuredConsumerObs((MantisKafkaConsumer<?>)consumer, mantisKafkaSourceConfig);
                child.onNext(mantisKafkaAckableObs);
            });
        }).doOnUnsubscribe(() -> {
            LOGGER.info("unsubscribed");
            this.done.set(true);
        }).doOnSubscribe(() -> {
            LOGGER.info("subscribed");
            this.done.set(false);
        });
    }

    private void processAckNotification(KafkaDataNotification notification) {
        KafkaData kafkaData = notification.getValue();
        TopicPartition topicPartition = new TopicPartition(kafkaData.getTopic(), kafkaData.getPartition());
        MantisKafkaConsumer<?> mantisKafkaConsumer = this.idToConsumerMap.get(kafkaData.getMantisKafkaConsumerId());
        if (mantisKafkaConsumer != null) {
            mantisKafkaConsumer.getPartitionStateManager().recordMessageAck(topicPartition, kafkaData.getOffset());
            if (!notification.isSuccess()) {
                LOGGER.debug("Got negative acknowledgement {}", (Object)notification);
            }
            mantisKafkaConsumer.getConsumerMetrics().incrementProcessedCount();
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("got Ack for consumer id {} not in idToConsumerMap (topic {})", (Object)kafkaData.getMantisKafkaConsumerId(), (Object)kafkaData.getTopic());
        }
    }

    private void startAckProcessor() {
        LOGGER.info("Acknowledgement processor started");
        this.ackSubject.subscribe(notification -> this.processAckNotification((KafkaDataNotification)notification));
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList params = new ArrayList();
        params.add(new StringParameter().name("kafka.source.consumer.topic").description("Kafka topic to connect to").validator(Validators.notNullOrEmpty()).required().build());
        params.add(new StringParameter().name("kafkaVip").description("vip address of source Kafka cluster").validator(Validators.notNullOrEmpty()).required().build());
        params.add(new StringParameter().name("checkpointStrategy").description("checkpoint strategy one of " + CheckpointStrategyOptions.values() + " (ensure enable.auto.commit param is set to false when enabling this)").defaultValue((Object)"disableCheckpointing").validator(Validators.alwaysPass()).build());
        params.add(new IntParameter().name("numKafkaConsumerPerWorker").description("No. of Kafka consumer instances per Mantis worker").validator(Validators.range((Number)1, (Number)16)).defaultValue((Object)1).build());
        params.add(new IntParameter().name("maxBytesInProcessing").description("The maximum amount of data per-consumer awaiting acks to trigger an offsets commit. These commits are in addition to any commits triggered by commitIntervalMs timer").defaultValue((Object)128000000).validator(Validators.range((Number)1, (Number)Integer.MAX_VALUE)).build());
        params.add(new IntParameter().name("consumerPollTimeoutMs").validator(Validators.range((Number)100, (Number)10000)).defaultValue((Object)250).build());
        params.add(new StringParameter().name("messageParserType").validator(Validators.notNullOrEmpty()).defaultValue((Object)ParserType.SIMPLE_JSON.getPropName()).build());
        params.add(new BooleanParameter().name("parseMessageInKafkaConsumerThread").validator(Validators.alwaysPass()).defaultValue((Object)true).build());
        params.add(new BooleanParameter().name("enableStaticPartitionAssign").validator(Validators.alwaysPass()).defaultValue((Object)false).description("Disable Kafka's default consumer group management and statically assign partitions to job workers. When enabling static partition assignments, disable auto-scaling and set the numPartitionsPerTopic job parameter").build());
        params.add(new StringParameter().name("numPartitionsPerTopic").validator(Validators.alwaysPass()).defaultValue((Object)"").description("Configures number of partitions on a kafka topic when static partition assignment is enabled. Format <topic1>:<numPartitions Topic1>,<topic2>:<numPartitions Topic2> Example: nf_errors_log:9,clevent:450").build());
        params.addAll(MantisKafkaConsumerConfig.getJobParameterDefinitions());
        return params;
    }
}

