package org.apache.pulsar.functions.source;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.5.2-5ce22c.jar:org/apache/pulsar/functions/source/SingleConsumerPulsarSource.class */
public class SingleConsumerPulsarSource<T> extends PulsarSource<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SingleConsumerPulsarSource.class);
    private final PulsarClient pulsarClient;
    private final SingleConsumerPulsarSourceConfig pulsarSourceConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private final TopicSchema topicSchema;
    private Consumer<T> consumer;
    private final List<Consumer<T>> inputConsumers;

    public SingleConsumerPulsarSource(PulsarClient pulsarClient, SingleConsumerPulsarSourceConfig singleConsumerPulsarSourceConfig, Map<String, String> map, ClassLoader classLoader) {
        super(pulsarClient, singleConsumerPulsarSourceConfig, map, classLoader);
        this.inputConsumers = new LinkedList();
        this.pulsarClient = pulsarClient;
        this.pulsarSourceConfig = singleConsumerPulsarSourceConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = map;
        this.functionClassLoader = classLoader;
    }

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        log.info("Opening pulsar source with config: {}", this.pulsarSourceConfig);
        Class<?> loadClass = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader);
        Preconditions.checkArgument(!Void.class.equals(loadClass), "Input type of Pulsar Function cannot be Void");
        String topic = this.pulsarSourceConfig.getTopic();
        PulsarSourceConsumerConfig<T> buildPulsarSourceConsumerConfig = buildPulsarSourceConsumerConfig(topic, this.pulsarSourceConfig.getConsumerConfig(), loadClass);
        log.info("Creating consumer for topic : {}, schema : {}, schemaInfo: {}", topic, buildPulsarSourceConsumerConfig.getSchema(), buildPulsarSourceConsumerConfig.getSchema().getSchemaInfo());
        this.consumer = createConsumeBuilder(topic, buildPulsarSourceConsumerConfig).subscribeAsync().join();
        this.inputConsumers.add(this.consumer);
    }

    @Override // org.apache.pulsar.io.core.Source
    public Record<T> read() throws Exception {
        return buildRecord(this.consumer, this.consumer.receive());
    }

    @VisibleForTesting
    Consumer<T> getInputConsumer() {
        return this.consumer;
    }

    @Override // org.apache.pulsar.functions.source.PulsarSource
    public List<Consumer<T>> getInputConsumers() {
        return this.inputConsumers;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}
