package org.apache.flink.streaming.connectors.pulsar;

import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.class */
public class FlinkPulsarProducer<T> extends RichSinkFunction<T> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
    private ClientConfigurationData clientConf;
    private ProducerConfigurationData producerConf;
    protected final SerializationSchema<T> schema;
    protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
    protected boolean flushOnCheckpoint;
    protected transient Producer<byte[]> producer;
    protected transient Function<MessageId, MessageId> successCallback;
    protected transient Function<Throwable, MessageId> failureCallback;
    protected volatile transient Exception asyncException;
    protected long pendingRecords;
    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
    protected final SerializableObject pendingRecordsLock = new SerializableObject();

    public FlinkPulsarProducer(String str, String str2, Authentication authentication, SerializationSchema<T> serializationSchema, PulsarKeyExtractor<T> pulsarKeyExtractor) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "Service url cannot be blank");
        Preconditions.checkArgument(StringUtils.isNotBlank(str2), "TopicName cannot be blank");
        Preconditions.checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
        this.clientConf = new ClientConfigurationData();
        this.producerConf = new ProducerConfigurationData();
        this.clientConf.setServiceUrl(str);
        this.clientConf.setAuthentication(authentication);
        this.producerConf.setTopicName(str2);
        this.schema = (SerializationSchema) Preconditions.checkNotNull(serializationSchema, "Serialization Schema not set");
        this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(pulsarKeyExtractor);
        ClosureCleaner.ensureSerializable(serializationSchema);
    }

    public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData, SerializationSchema<T> serializationSchema, PulsarKeyExtractor<T> pulsarKeyExtractor) {
        this.clientConf = (ClientConfigurationData) Preconditions.checkNotNull(clientConfigurationData, "client conf can not be null");
        this.producerConf = (ProducerConfigurationData) Preconditions.checkNotNull(producerConfigurationData, "producer conf can not be null");
        this.schema = (SerializationSchema) Preconditions.checkNotNull(serializationSchema, "Serialization Schema not set");
        this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(pulsarKeyExtractor);
        ClosureCleaner.ensureSerializable(serializationSchema);
    }

    public PulsarKeyExtractor<T> getKeyExtractor() {
        return this.flinkPulsarKeyExtractor;
    }

    public PulsarProduceMode getProduceMode() {
        return this.produceMode;
    }

    public void setProduceMode(PulsarProduceMode pulsarProduceMode) {
        this.produceMode = (PulsarProduceMode) Preconditions.checkNotNull(pulsarProduceMode);
    }

    public void setFlushOnCheckpoint(boolean z) {
        this.flushOnCheckpoint = z;
    }

    private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> pulsarKeyExtractor) {
        return null == pulsarKeyExtractor ? PulsarKeyExtractor.NULL : pulsarKeyExtractor;
    }

    private Producer<byte[]> createProducer() throws Exception {
        return (Producer) CachedPulsarClient.getOrCreate(this.clientConf).createProducerAsync(this.producerConf).get();
    }

    public void open(Configuration configuration) throws Exception {
        this.producer = createProducer();
        RuntimeContext runtimeContext = getRuntimeContext();
        LOG.info("Starting FlinkPulsarProducer ({}/{}) to produce into pulsar topic {}", new Object[]{Integer.valueOf(runtimeContext.getIndexOfThisSubtask() + 1), Integer.valueOf(runtimeContext.getNumberOfParallelSubtasks()), this.producerConf.getTopicName()});
        if (this.flushOnCheckpoint && !getRuntimeContext().isCheckpointingEnabled()) {
            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        this.successCallback = messageId -> {
            acknowledgeMessage();
            return messageId;
        };
        if (PulsarProduceMode.AT_MOST_ONCE == this.produceMode) {
            this.failureCallback = th -> {
                LOG.error("Error while sending record to Pulsar : " + th.getMessage(), th);
                return null;
            };
        } else {
            if (PulsarProduceMode.AT_LEAST_ONCE != this.produceMode) {
                throw new UnsupportedOperationException("Unsupported produce mode " + this.produceMode);
            }
            this.failureCallback = th2 -> {
                if (null != this.asyncException) {
                    return null;
                }
                if (th2 instanceof Exception) {
                    this.asyncException = (Exception) th2;
                    return null;
                }
                this.asyncException = new Exception(th2);
                return null;
            };
        }
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        checkErroneous();
        byte[] serialize = this.schema.serialize(t);
        TypedMessageBuilder newMessage = this.producer.newMessage();
        if (null != context.timestamp()) {
            newMessage = newMessage.eventTime(context.timestamp().longValue());
        }
        String key = this.flinkPulsarKeyExtractor.getKey(t);
        if (null != key) {
            newMessage = newMessage.key(key);
        }
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords++;
            }
        }
        newMessage.value(serialize).sendAsync().thenApply((Function) this.successCallback).exceptionally((Function) this.failureCallback);
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        checkErroneous();
    }

    private void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords--;
                if (this.pendingRecords == 0) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkErroneous();
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                while (this.pendingRecords > 0) {
                    this.pendingRecordsLock.wait(100L);
                }
            }
            checkErroneous();
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    protected void checkErroneous() throws Exception {
        Exception exc = this.asyncException;
        if (exc != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Pulsar: " + exc.getMessage(), exc);
        }
    }
}
