package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.util.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/hono/client/kafka/consumer/HonoKafkaConsumer.class */
public class HonoKafkaConsumer implements Lifecycle {
    private static final long WAIT_FOR_REBALANCE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
    private static final String DEFAULT_MAX_POLL_INTERNAL_MS = Long.toString(TimeUnit.SECONDS.toMillis(20));
    protected final Logger log;
    protected final Vertx vertx;
    protected final AtomicBoolean stopCalled;
    private final Map<String, String> consumerConfig;
    private final AtomicReference<Promise<Void>> subscribeDonePromiseRef;
    private final Handler<KafkaConsumerRecord<String, Buffer>> recordHandler;
    private final Set<String> topics;
    private final Pattern topicPattern;
    private KafkaConsumer<String, Buffer> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private Set<String> subscribedTopicPatternTopics;
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private boolean respectTtl;
    private Supplier<Consumer<String, Buffer>> kafkaConsumerSupplier;

    public HonoKafkaConsumer(Vertx vertx, Set<String> set, Handler<KafkaConsumerRecord<String, Buffer>> handler, Map<String, String> map) {
        this(vertx, (Set) Objects.requireNonNull(set), null, handler, map);
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern pattern, Handler<KafkaConsumerRecord<String, Buffer>> handler, Map<String, String> map) {
        this(vertx, null, (Pattern) Objects.requireNonNull(pattern), handler, map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HonoKafkaConsumer(Vertx vertx, Set<String> set, Pattern pattern, Handler<KafkaConsumerRecord<String, Buffer>> handler, Map<String, String> map) {
        this.log = LoggerFactory.getLogger(getClass());
        this.stopCalled = new AtomicBoolean();
        this.subscribeDonePromiseRef = new AtomicReference<>();
        this.subscribedTopicPatternTopics = new HashSet();
        this.respectTtl = true;
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        this.topics = set;
        this.topicPattern = pattern;
        this.recordHandler = (Handler) Objects.requireNonNull(handler);
        this.consumerConfig = (Map) Objects.requireNonNull(map);
        if ((set == null) == (pattern == null)) {
            throw new NullPointerException("either topics or topicPattern has to be set");
        }
        if (!map.containsKey("max.poll.interval.ms")) {
            map.put("max.poll.interval.ms", DEFAULT_MAX_POLL_INTERNAL_MS);
        }
        if (map.containsKey("group.id")) {
            return;
        }
        if ("true".equals(map.get("enable.auto.commit"))) {
            throw new IllegalArgumentException("group.id config entry has to be set if auto-commit is enabled");
        }
        this.log.trace("no group.id set, using a random UUID as default and disabling auto-commit");
        map.put("group.id", UUID.randomUUID().toString());
        map.put("enable.auto.commit", "false");
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.onPartitionsAssignedHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.onPartitionsRevokedHandler = (Handler) Objects.requireNonNull(handler);
    }

    public final void setRespectTtl(boolean z) {
        this.respectTtl = z;
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, Buffer>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final KafkaConsumer<String, Buffer> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        return this.kafkaConsumer;
    }

    public Future<Void> start() {
        this.context = this.vertx.getOrCreateContext();
        Promise promise = Promise.promise();
        runOnContext(r6 -> {
            this.kafkaConsumer = (KafkaConsumer) Optional.ofNullable(this.kafkaConsumerSupplier).map(supplier -> {
                return KafkaConsumer.create(this.vertx, (Consumer) supplier.get());
            }).orElseGet(() -> {
                return KafkaConsumer.create(this.vertx, this.consumerConfig, String.class, Buffer.class);
            });
            this.kafkaConsumer.handler(kafkaConsumerRecord -> {
                if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(kafkaConsumerRecord.headers())) {
                    onRecordHandlerSkippedForExpiredRecord(kafkaConsumerRecord);
                    return;
                }
                try {
                    this.recordHandler.handle(kafkaConsumerRecord);
                } catch (Exception e) {
                    this.log.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", new Object[]{kafkaConsumerRecord.topic(), Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset()), kafkaConsumerRecord.headers(), e});
                }
            });
            this.kafkaConsumer.exceptionHandler(th -> {
                this.log.error("consumer error occurred", th);
            });
            installRebalanceListeners();
            subscribeAndWaitForRebalance().map(r6 -> {
                if (this.topicPattern == null) {
                    this.log.debug("subscribed to topics {}", this.topics);
                } else if (this.subscribedTopicPatternTopics.size() <= 5) {
                    this.log.debug("subscribed to topic pattern [{}], matching topics: {}", this.topicPattern, this.subscribedTopicPatternTopics);
                } else {
                    this.log.debug("subscribed to topic pattern [{}], matching {} topics", this.topicPattern, Integer.valueOf(this.subscribedTopicPatternTopics.size()));
                }
                return (Void) null;
            }).onComplete(promise.future());
        });
        return promise.future();
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, Buffer> kafkaConsumerRecord) {
    }

    private void installRebalanceListeners() {
        replaceRebalanceListener(this.kafkaConsumer, new ConsumerRebalanceListener() { // from class: org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer.1
            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> collection) {
                Set<TopicPartition> from = Helper.from(collection);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions assigned: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(collection));
                }
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(from);
                Set set = (Set) Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(handler -> {
                    return Helper.from(HonoKafkaConsumer.this.getKafkaConsumer().asStream().unwrap().assignment());
                }).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(r6 -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(from);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle(set);
                    }
                });
            }

            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> collection) {
                Set<TopicPartition> from = Helper.from(collection);
                if (HonoKafkaConsumer.this.log.isDebugEnabled()) {
                    HonoKafkaConsumer.this.log.debug("partitions revoked: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(collection));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(from);
                HonoKafkaConsumer.this.context.runOnContext(r5 -> {
                    HonoKafkaConsumer.this.onPartitionsRevoked(from);
                });
            }

            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> collection) {
                Set<TopicPartition> from = Helper.from(collection);
                if (HonoKafkaConsumer.this.log.isInfoEnabled()) {
                    HonoKafkaConsumer.this.log.info("partitions lost: [{}]", HonoKafkaConsumerHelper.getPartitionsDebugString(collection));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(from);
                HonoKafkaConsumer.this.context.runOnContext(r5 -> {
                    HonoKafkaConsumer.this.onPartitionsRevoked(from);
                });
            }
        });
    }

    private Future<Void> subscribeAndWaitForRebalance() {
        if (this.stopCalled.get()) {
            return Future.failedFuture(new ServerErrorException(503, "already stopped"));
        }
        Promise<Void> updateAndGet = this.subscribeDonePromiseRef.updateAndGet(promise -> {
            return promise == null ? Promise.promise() : promise;
        });
        Promise promise2 = Promise.promise();
        if (this.topicPattern != null) {
            this.kafkaConsumer.subscribe(this.topicPattern, promise2);
        } else {
            this.kafkaConsumer.subscribe(this.topics, promise2);
        }
        if (this.kafkaConsumerWorker == null) {
            this.kafkaConsumerWorker = getKafkaConsumerWorker(this.kafkaConsumer);
        }
        this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT, l -> {
            if (updateAndGet.future().isComplete()) {
                return;
            }
            this.log.warn("timed out waiting for rebalance and update of subscribed topics");
            updateAndGet.tryFail(new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
        });
        return CompositeFuture.all(promise2.future(), updateAndGet.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> set) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> set) {
        if (this.topicPattern != null) {
            Promise promise = Promise.promise();
            this.kafkaConsumer.subscription(promise);
            promise.future().onSuccess(set2 -> {
                this.subscribedTopicPatternTopics = new HashSet(set2);
            }).onFailure(th -> {
                this.log.info("failed to get subscription", th);
            }).map((Void) null).onComplete(asyncResult -> {
                Optional.ofNullable(this.subscribeDonePromiseRef.getAndSet(null)).ifPresent(promise2 -> {
                    tryCompletePromise(promise2, asyncResult);
                });
            });
        } else {
            Optional.ofNullable(this.subscribeDonePromiseRef.getAndSet(null)).ifPresent((v0) -> {
                v0.tryComplete();
            });
        }
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(set);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> set) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> set) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(set);
        }
    }

    public Future<Void> stop() {
        if (this.kafkaConsumer == null) {
            return Future.failedFuture("not started");
        }
        if (!this.stopCalled.compareAndSet(false, true)) {
            this.log.trace("stop already called");
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        this.kafkaConsumer.close(promise);
        return promise.future();
    }

    protected void runOnContext(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(r4 -> {
                handler.handle((Object) null);
            });
        } else {
            handler.handle((Object) null);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException("consumer not initialized/started");
        }
        if (this.stopCalled.get()) {
            return;
        }
        this.kafkaConsumerWorker.submit(() -> {
            if (this.stopCalled.get()) {
                return;
            }
            handler.handle((Object) null);
        });
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        return this.topicPattern == null ? Set.of() : new HashSet(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String str) {
        Objects.requireNonNull(str);
        return this.topics != null ? this.topics.contains(str) : this.subscribedTopicPatternTopics.contains(str);
    }

    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String str) {
        Objects.requireNonNull(str);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(str).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (this.kafkaConsumer == null) {
            return Future.failedFuture(new ServerErrorException(500, "not started"));
        }
        if (this.stopCalled.get()) {
            return Future.failedFuture(new ServerErrorException(503, "already stopped"));
        }
        if (this.subscribedTopicPatternTopics.contains(str)) {
            this.log.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", str);
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, str, promise);
        return promise.future().recover(th -> {
            this.log.warn("ensureTopicIsAmongSubscribedTopics: error getting partitions for topic [{}]", str, th);
            return Future.failedFuture(new ServerErrorException(503, "error getting topic partitions", th));
        }).compose(list -> {
            if (list.isEmpty()) {
                this.log.warn("ensureTopicIsAmongSubscribedTopics: topic doesn't exist and didn't get auto-created: {}", str);
                return Future.failedFuture(new ServerErrorException(503, "topic doesn't exist and didn't get auto-created"));
            }
            if (this.subscribedTopicPatternTopics.contains(str)) {
                return Future.succeededFuture();
            }
            this.log.debug("ensureTopicIsAmongSubscribedTopics: verified topic existence, wait for subscription update and rebalance [{}]", str);
            return subscribeAndWaitForRebalance().compose(r6 -> {
                if (this.subscribedTopicPatternTopics.contains(str)) {
                    return Future.succeededFuture(r6);
                }
                this.log.debug("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance; try again [topic: {}]", str);
                return subscribeAndWaitForRebalance();
            }).compose(r7 -> {
                if (this.subscribedTopicPatternTopics.contains(str)) {
                    this.log.debug("ensureTopicIsAmongSubscribedTopics: done updating topic subscription");
                    return Future.succeededFuture(r7);
                }
                this.log.warn("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance [topic: {}]", str);
                return Future.failedFuture(new ServerErrorException(503, "subscription not updated with topic after rebalance"));
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void tryCompletePromise(Promise<T> promise, AsyncResult<T> asyncResult) {
        if (asyncResult.succeeded()) {
            promise.tryComplete(asyncResult.result());
        } else {
            promise.tryFail(asyncResult.cause());
        }
    }

    private static void replaceRebalanceListener(KafkaConsumer<String, Buffer> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener) {
        try {
            Field declaredField = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            declaredField.setAccessible(true);
            declaredField.set(kafkaConsumer.asStream(), consumerRebalanceListener);
        } catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private static ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, Buffer> kafkaConsumer) {
        try {
            Field declaredField = KafkaReadStreamImpl.class.getDeclaredField("worker");
            declaredField.setAccessible(true);
            ExecutorService executorService = (ExecutorService) declaredField.get(kafkaConsumer.asStream());
            if (executorService == null) {
                throw new IllegalStateException("worker not set");
            }
            return executorService;
        } catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
    }
}
