package org.apache.kafka.connect.transforms;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.log4j.spi.LocationInfo;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField.class */
public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final String OPTIONALITY_DOC = "Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).";
    private static final String PURPOSE = "field insertion";
    private InsertionSpec topicField;
    private InsertionSpec partitionField;
    private InsertionSpec offsetField;
    private InsertionSpec timestampField;
    private InsertionSpec staticField;
    private String staticValue;
    private Cache<Schema, Schema> schemaUpdateCache;
    public static final String OVERVIEW_DOC = "Insert field(s) using attributes from the record metadata or a configured static value.<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
    public static final ConfigDef CONFIG_DEF = new ConfigDef().define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka topic. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka partition. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define(ConfigName.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka offset - only applicable to sink connectors.<br/>Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define(ConfigName.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for record timestamp. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define(ConfigName.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for static data field. Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).").define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Static field value, if field name configured.");
    private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build();

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$ConfigName.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$ConfigName.class */
    private interface ConfigName {
        public static final String TOPIC_FIELD = "topic.field";
        public static final String PARTITION_FIELD = "partition.field";
        public static final String OFFSET_FIELD = "offset.field";
        public static final String TIMESTAMP_FIELD = "timestamp.field";
        public static final String STATIC_FIELD = "static.field";
        public static final String STATIC_VALUE = "static.value";
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$InsertionSpec.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$InsertionSpec.class */
    public static final class InsertionSpec {
        final String name;
        final boolean optional;

        private InsertionSpec(String str, boolean z) {
            this.name = str;
            this.optional = z;
        }

        public static InsertionSpec parse(String str) {
            if (str == null) {
                return null;
            }
            return str.endsWith(LocationInfo.NA) ? new InsertionSpec(str.substring(0, str.length() - 1), true) : str.endsWith("!") ? new InsertionSpec(str.substring(0, str.length() - 1), false) : new InsertionSpec(str, true);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$Key.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$Key.class */
    public static class Key<R extends ConnectRecord<R>> extends InsertField<R> {
        @Override // org.apache.kafka.connect.transforms.InsertField
        protected Schema operatingSchema(R r) {
            return r.keySchema();
        }

        @Override // org.apache.kafka.connect.transforms.InsertField
        protected Object operatingValue(R r) {
            return r.key();
        }

        @Override // org.apache.kafka.connect.transforms.InsertField
        protected R newRecord(R r, Schema schema, Object obj) {
            return (R) r.newRecord(r.topic(), r.kafkaPartition(), schema, obj, r.valueSchema(), r.value(), r.timestamp());
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$Value.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/InsertField$Value.class */
    public static class Value<R extends ConnectRecord<R>> extends InsertField<R> {
        @Override // org.apache.kafka.connect.transforms.InsertField
        protected Schema operatingSchema(R r) {
            return r.valueSchema();
        }

        @Override // org.apache.kafka.connect.transforms.InsertField
        protected Object operatingValue(R r) {
            return r.value();
        }

        @Override // org.apache.kafka.connect.transforms.InsertField
        protected R newRecord(R r, Schema schema, Object obj) {
            return (R) r.newRecord(r.topic(), r.kafkaPartition(), r.keySchema(), r.key(), schema, obj, r.timestamp());
        }
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, map);
        this.topicField = InsertionSpec.parse(simpleConfig.getString(ConfigName.TOPIC_FIELD));
        this.partitionField = InsertionSpec.parse(simpleConfig.getString(ConfigName.PARTITION_FIELD));
        this.offsetField = InsertionSpec.parse(simpleConfig.getString(ConfigName.OFFSET_FIELD));
        this.timestampField = InsertionSpec.parse(simpleConfig.getString(ConfigName.TIMESTAMP_FIELD));
        this.staticField = InsertionSpec.parse(simpleConfig.getString(ConfigName.STATIC_FIELD));
        this.staticValue = simpleConfig.getString(ConfigName.STATIC_VALUE);
        if (this.topicField == null && this.partitionField == null && this.offsetField == null && this.timestampField == null && this.staticField == null) {
            throw new ConfigException("No field insertion configured");
        }
        if (this.staticField != null && this.staticValue == null) {
            throw new ConfigException(ConfigName.STATIC_VALUE, null, "No value specified for static field: " + this.staticField);
        }
        this.schemaUpdateCache = new SynchronizedCache(new LRUCache(16));
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(R r) {
        return operatingSchema(r) == null ? applySchemaless(r) : applyWithSchema(r);
    }

    private R applySchemaless(R r) {
        HashMap hashMap = new HashMap(Requirements.requireMap(operatingValue(r), PURPOSE));
        if (this.topicField != null) {
            hashMap.put(this.topicField.name, r.topic());
        }
        if (this.partitionField != null && r.kafkaPartition() != null) {
            hashMap.put(this.partitionField.name, r.kafkaPartition());
        }
        if (this.offsetField != null) {
            hashMap.put(this.offsetField.name, Long.valueOf(Requirements.requireSinkRecord(r, PURPOSE).kafkaOffset()));
        }
        if (this.timestampField != null && r.timestamp() != null) {
            hashMap.put(this.timestampField.name, r.timestamp());
        }
        if (this.staticField != null && this.staticValue != null) {
            hashMap.put(this.staticField.name, this.staticValue);
        }
        return newRecord(r, null, hashMap);
    }

    private R applyWithSchema(R r) {
        Struct requireStruct = Requirements.requireStruct(operatingValue(r), PURPOSE);
        Schema schema = this.schemaUpdateCache.get(requireStruct.schema());
        if (schema == null) {
            schema = makeUpdatedSchema(requireStruct.schema());
            this.schemaUpdateCache.put(requireStruct.schema(), schema);
        }
        Struct struct = new Struct(schema);
        for (Field field : requireStruct.schema().fields()) {
            struct.put(field.name(), requireStruct.get(field));
        }
        if (this.topicField != null) {
            struct.put(this.topicField.name, r.topic());
        }
        if (this.partitionField != null && r.kafkaPartition() != null) {
            struct.put(this.partitionField.name, r.kafkaPartition());
        }
        if (this.offsetField != null) {
            struct.put(this.offsetField.name, Long.valueOf(Requirements.requireSinkRecord(r, PURPOSE).kafkaOffset()));
        }
        if (this.timestampField != null && r.timestamp() != null) {
            struct.put(this.timestampField.name, new Date(r.timestamp().longValue()));
        }
        if (this.staticField != null && this.staticValue != null) {
            struct.put(this.staticField.name, this.staticValue);
        }
        return newRecord(r, schema, struct);
    }

    private Schema makeUpdatedSchema(Schema schema) {
        SchemaBuilder copySchemaBasics = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
        for (Field field : schema.fields()) {
            copySchemaBasics.field(field.name(), field.schema());
        }
        if (this.topicField != null) {
            copySchemaBasics.field(this.topicField.name, this.topicField.optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA);
        }
        if (this.partitionField != null) {
            copySchemaBasics.field(this.partitionField.name, this.partitionField.optional ? Schema.OPTIONAL_INT32_SCHEMA : Schema.INT32_SCHEMA);
        }
        if (this.offsetField != null) {
            copySchemaBasics.field(this.offsetField.name, this.offsetField.optional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA);
        }
        if (this.timestampField != null) {
            copySchemaBasics.field(this.timestampField.name, this.timestampField.optional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA);
        }
        if (this.staticField != null) {
            copySchemaBasics.field(this.staticField.name, this.staticField.optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA);
        }
        return copySchemaBasics.build();
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.schemaUpdateCache = null;
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    protected abstract Schema operatingSchema(R r);

    protected abstract Object operatingValue(R r);

    protected abstract R newRecord(R r, Schema schema, Object obj);
}
