package org.apache.flink.batch.connectors.pulsar;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.class */
public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BasePulsarOutputFormat.class);
    private static final long serialVersionUID = 2304601727522060427L;
    private transient Function<Throwable, MessageId> failureCallback;
    private static volatile Producer<byte[]> producer;
    protected SerializationSchema<T> serializationSchema;
    private ClientConfigurationData clientConf;
    private ProducerConfigurationData producerConf;

    /* JADX INFO: Access modifiers changed from: protected */
    public BasePulsarOutputFormat(String str, String str2, Authentication authentication) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "serviceUrl cannot be blank.");
        Preconditions.checkArgument(StringUtils.isNotBlank(str2), "topicName cannot be blank.");
        this.clientConf = new ClientConfigurationData();
        this.producerConf = new ProducerConfigurationData();
        this.clientConf.setServiceUrl(str);
        this.clientConf.setAuthentication(authentication);
        this.producerConf.setTopicName(str2);
        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BasePulsarOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
        this.clientConf = (ClientConfigurationData) Preconditions.checkNotNull(clientConfigurationData, "client config data should not be null");
        this.producerConf = (ProducerConfigurationData) Preconditions.checkNotNull(producerConfigurationData, "producer config data should not be null");
        Preconditions.checkArgument(StringUtils.isNotBlank(clientConfigurationData.getServiceUrl()), "serviceUrl cannot be blank.");
        Preconditions.checkArgument(StringUtils.isNotBlank(producerConfigurationData.getTopicName()), "topicName cannot be blank.");
        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) throws IOException {
        producer = getProducerInstance();
        this.failureCallback = th -> {
            LOG.error("Error while sending record to Pulsar: " + th.getMessage(), th);
            return null;
        };
    }

    public void writeRecord(T t) throws IOException {
        producer.sendAsync(this.serializationSchema.serialize(t)).exceptionally((Function) this.failureCallback);
    }

    public void close() throws IOException {
    }

    private Producer<byte[]> getProducerInstance() throws PulsarClientException {
        if (producer == null) {
            synchronized (PulsarOutputFormat.class) {
                if (producer == null) {
                    producer = (Producer) Preconditions.checkNotNull(createPulsarProducer(), "Pulsar producer cannot be null.");
                }
            }
        }
        return producer;
    }

    private Producer<byte[]> createPulsarProducer() throws PulsarClientException {
        try {
            return (Producer) new PulsarClientImpl(this.clientConf).createProducerAsync(this.producerConf).get();
        } catch (PulsarClientException | InterruptedException | ExecutionException e) {
            LOG.error("Pulsar producer cannot be created.", e);
            throw new PulsarClientException(e);
        }
    }
}
