/*
 * Decompiled with CFR 0.152.
 */
package io.americanexpress.synapse.subscriber.kafka.interceptor;

import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.stereotype.Component;

@Component
public class BaseKafkaSubscriberMetricInterceptor<K, V>
implements RecordInterceptor<K, V>,
BatchInterceptor<K, V> {
    protected final XLogger log = XLoggerFactory.getXLogger(this.getClass());

    public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> consumerRecord) {
        this.startMetricLog(consumerRecord);
        return this.preHandle(consumerRecord);
    }

    public void afterRecord(ConsumerRecord<K, V> consumerRecord, Consumer<K, V> consumer) {
        this.endMetricLog(consumerRecord);
        this.postHandle(consumerRecord, consumer);
    }

    protected ConsumerRecord<K, V> preHandle(ConsumerRecord<K, V> consumerRecord) {
        return consumerRecord;
    }

    protected void postHandle(ConsumerRecord<K, V> consumerRecord, Consumer<K, V> consumer) {
    }

    public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> consumerRecords, Consumer<K, V> consumer) {
        consumerRecords.iterator().forEachRemaining(this::startMetricLog);
        return this.preHandleBatch(consumerRecords);
    }

    public void success(ConsumerRecords<K, V> consumerRecords, Consumer<K, V> consumer) {
        consumerRecords.iterator().forEachRemaining(this::endMetricLog);
        this.postHandleBatch(consumerRecords, consumer);
    }

    protected ConsumerRecords<K, V> preHandleBatch(ConsumerRecords<K, V> consumerRecords) {
        return consumerRecords;
    }

    protected void postHandleBatch(ConsumerRecords<K, V> consumerRecords, Consumer<K, V> consumer) {
    }

    private void startMetricLog(ConsumerRecord<K, V> consumerRecord) {
        String startTime = String.valueOf(System.currentTimeMillis());
        consumerRecord.headers().add("startTime", startTime.getBytes());
        this.log.info("TOPIC: {}, PARTITION: {}, OFFSET: {}, KEY: {}", new Object[]{consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key()});
    }

    private void endMetricLog(ConsumerRecord<K, V> consumerRecord) {
        long startTime = Long.parseLong(new String(consumerRecord.headers().lastHeader("startTime").value(), StandardCharsets.UTF_8));
        long executionTime = System.currentTimeMillis() - startTime;
        this.log.info("TOPIC: {}, PARTITION: {}, OFFSET: {}, KEY: {}, PROCESSING_TIME: {} ms.", new Object[]{consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.key(), executionTime});
    }
}

