/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.source;

import io.mantisrx.connector.kafka.source.TopicPartitionStateManager;
import io.mantisrx.connector.kafka.source.checkpoint.strategy.CheckpointStrategy;
import java.util.Collection;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerRebalanceListener<S>
implements ConsumerRebalanceListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerRebalanceListener.class);
    private final KafkaConsumer<?, ?> consumer;
    private final TopicPartitionStateManager partitionStateManager;
    private final CheckpointStrategy<S> checkpointStrategy;

    public KafkaConsumerRebalanceListener(KafkaConsumer<?, ?> consumer, TopicPartitionStateManager partitionStateManager, CheckpointStrategy<S> checkpointStrategy) {
        this.consumer = consumer;
        this.partitionStateManager = partitionStateManager;
        this.checkpointStrategy = checkpointStrategy;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOGGER.info("partitions revoked, resetting partition state: {}", (Object)partitions.toString());
        partitions.stream().forEach(tp -> this.partitionStateManager.resetCounters((TopicPartition)tp));
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOGGER.info("new partitions assigned: {}", (Object)partitions.toString());
        try {
            for (TopicPartition tp : partitions) {
                Optional<S> checkpointState = this.checkpointStrategy.loadCheckpoint(tp);
                checkpointState.filter(x -> x instanceof OffsetAndMetadata).map(OffsetAndMetadata.class::cast).ifPresent(oam -> {
                    long offset = oam.offset();
                    LOGGER.info("seeking consumer to checkpoint'ed offset {} for partition {} on assignment", (Object)offset, (Object)tp);
                    try {
                        this.consumer.seek(tp, offset);
                    }
                    catch (Exception e) {
                        LOGGER.error("caught exception seeking consumer to offset {} on topic partition {}", new Object[]{offset, tp, e});
                    }
                });
            }
        }
        catch (Exception e) {
            LOGGER.error("caught exception on partition assignment {}", partitions, (Object)e);
        }
    }
}

