/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source.checkpoint.strategy;

import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import io.mantisrx.connector.kafka.source.metrics.ConsumerMetrics;
import io.mantisrx.runtime.Context;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetCheckpointStrategy
implements CheckpointStrategy<OffsetAndMetadata> {
    private static Logger logger = LoggerFactory.getLogger(KafkaOffsetCheckpointStrategy.class);
    private final KafkaConsumer<?, ?> consumer;
    private final ConsumerMetrics consumerMetrics;

    public KafkaOffsetCheckpointStrategy(KafkaConsumer<?, ?> consumer, ConsumerMetrics metrics) {
        this.consumer = consumer;
        this.consumerMetrics = metrics;
    }

    @Override
    public void init(Map<String, String> properties) {
    }

    @Override
    public boolean persistCheckpoint(Map<TopicPartition, OffsetAndMetadata> checkpoint) {
        if (!checkpoint.isEmpty()) {
            try {
                logger.debug("committing offsets {}", (Object)checkpoint.toString());
                this.consumer.commitSync(checkpoint);
                this.consumerMetrics.recordCommittedOffset(checkpoint);
            }
            catch (InvalidOffsetException ioe) {
                logger.warn("failed to commit offsets " + checkpoint.toString() + " will seek to beginning", (Throwable)ioe);
                Set topicPartitionSet = ioe.partitions();
                for (TopicPartition tp : topicPartitionSet) {
                    logger.info("partition " + tp.toString() + " consumer position " + this.consumer.position(tp));
                }
                this.consumer.seekToBeginning((Collection)ioe.partitions());
            }
            catch (KafkaException cfe) {
                logger.warn("unrecoverable exception on commit offsets " + checkpoint.toString(), (Throwable)cfe);
                return false;
            }
        }
        return true;
    }

    @Override
    public Optional<OffsetAndMetadata> loadCheckpoint(TopicPartition tp) {
        logger.trace("rely on default kafka protocol to seek to last committed offset");
        return Optional.empty();
    }

    @Override
    public void init(Context context) {
    }

    @Override
    public Map<TopicPartition, Optional<OffsetAndMetadata>> loadCheckpoints(List<TopicPartition> tpList) {
        HashMap<TopicPartition, Optional<OffsetAndMetadata>> mp = new HashMap<TopicPartition, Optional<OffsetAndMetadata>>();
        for (TopicPartition tp : tpList) {
            mp.put(tp, this.loadCheckpoint(tp));
        }
        return mp;
    }

    @Override
    public String type() {
        return "offsetsOnlyDefaultKafka";
    }
}

