package org.apache.flink.streaming.connectors.pulsar;

import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.class */
public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
    protected ClientConfigurationData clientConfigurationData;
    protected ProducerConfigurationData producerConfigurationData;
    protected final String routingKeyFieldName;
    protected SerializationSchema<Row> serializationSchema;
    protected String[] fieldNames;
    protected TypeInformation[] fieldTypes;
    protected PulsarKeyExtractor<Row> keyExtractor;
    private Class<? extends SpecificRecord> recordClazz;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink$AvroKeyExtractor.class */
    private static class AvroKeyExtractor implements PulsarKeyExtractor<Row> {
        private final int keyIndex;

        public AvroKeyExtractor(String str, String[] strArr, TypeInformation<?>[] typeInformationArr, Class<? extends SpecificRecord> cls) {
            Preconditions.checkArgument(strArr.length == typeInformationArr.length, "Number of provided field names and types does not match.");
            Schema.Type type = SpecificData.get().getSchema(cls).getField(str).schema().getType();
            int indexOf = Arrays.asList(strArr).indexOf(str);
            Preconditions.checkArgument(indexOf >= 0, "Key field '" + str + "' not found");
            Preconditions.checkArgument(Schema.Type.STRING.equals(type), "Key field must be of type 'STRING'");
            this.keyIndex = indexOf;
        }

        @Override // org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor
        public String getKey(Row row) {
            return row.getField(this.keyIndex).toString();
        }
    }

    public PulsarAvroTableSink(String str, String str2, Authentication authentication, String str3, Class<? extends SpecificRecord> cls) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "Service url not set");
        Preconditions.checkArgument(StringUtils.isNotBlank(str2), "Topic is null");
        Preconditions.checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
        this.clientConfigurationData = new ClientConfigurationData();
        this.producerConfigurationData = new ProducerConfigurationData();
        this.clientConfigurationData.setServiceUrl(str);
        this.clientConfigurationData.setAuthentication(authentication);
        this.producerConfigurationData.setTopicName(str2);
        this.routingKeyFieldName = str3;
        this.recordClazz = cls;
    }

    public PulsarAvroTableSink(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData, String str, Class<? extends SpecificRecord> cls) {
        this.clientConfigurationData = (ClientConfigurationData) Preconditions.checkNotNull(clientConfigurationData, "client config can not be null");
        this.producerConfigurationData = (ProducerConfigurationData) Preconditions.checkNotNull(producerConfigurationData, "producer config can not be null");
        Preconditions.checkArgument(StringUtils.isNotBlank(clientConfigurationData.getServiceUrl()), "Service url not set");
        Preconditions.checkArgument(StringUtils.isNotBlank(producerConfigurationData.getTopicName()), "Topic is null");
        this.routingKeyFieldName = str;
        this.recordClazz = cls;
    }

    protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
        this.serializationSchema = new AvroRowSerializationSchema(this.recordClazz);
        return new FlinkPulsarProducer<>(this.clientConfigurationData, this.producerConfigurationData, this.serializationSchema, this.keyExtractor);
    }

    public void emitDataStream(DataStream<Row> dataStream) {
        Preconditions.checkState(this.fieldNames != null, "Table sink is not configured");
        Preconditions.checkState(this.fieldTypes != null, "Table sink is not configured");
        Preconditions.checkState(this.serializationSchema != null, "Table sink is not configured");
        Preconditions.checkState(this.keyExtractor != null, "Table sink is not configured");
        dataStream.addSink(createFlinkPulsarProducer());
    }

    public TypeInformation<Row> getOutputType() {
        return new RowTypeInfo(this.fieldTypes, this.fieldNames);
    }

    public String[] getFieldNames() {
        return this.fieldNames;
    }

    public TypeInformation<?>[] getFieldTypes() {
        return this.fieldTypes;
    }

    public TableSink<Row> configure(String[] strArr, TypeInformation<?>[] typeInformationArr) {
        PulsarAvroTableSink pulsarAvroTableSink = new PulsarAvroTableSink(this.clientConfigurationData, this.producerConfigurationData, this.routingKeyFieldName, this.recordClazz);
        pulsarAvroTableSink.fieldNames = (String[]) Preconditions.checkNotNull(strArr, "Field names are null");
        pulsarAvroTableSink.fieldTypes = (TypeInformation[]) Preconditions.checkNotNull(typeInformationArr, "Field types are null");
        Preconditions.checkArgument(strArr.length == typeInformationArr.length, "Number of provided field names and types do not match");
        pulsarAvroTableSink.serializationSchema = new AvroRowSerializationSchema(this.recordClazz);
        pulsarAvroTableSink.keyExtractor = new AvroKeyExtractor(this.routingKeyFieldName, strArr, typeInformationArr, this.recordClazz);
        return pulsarAvroTableSink;
    }
}
