package org.apache.kafka.connect.transforms;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1-rc-202105140121.jar:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/ValueToKey.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-transforms-2.3.0.jar:org/apache/kafka/connect/transforms/ValueToKey.class */
public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R> {
    public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
    public static final String FIELDS_CONFIG = "fields";
    public static final ConfigDef CONFIG_DEF = new ConfigDef().define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Field names on the record value to extract as the record key.");
    private static final String PURPOSE = "copying fields from value to key";
    private List<String> fields;
    private Cache<Schema, Schema> valueToKeySchemaCache;

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.fields = new SimpleConfig(CONFIG_DEF, map).getList(FIELDS_CONFIG);
        this.valueToKeySchemaCache = new SynchronizedCache(new LRUCache(16));
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(R r) {
        return r.valueSchema() == null ? applySchemaless(r) : applyWithSchema(r);
    }

    private R applySchemaless(R r) {
        Map<String, Object> requireMap = Requirements.requireMap(r.value(), PURPOSE);
        HashMap hashMap = new HashMap(this.fields.size());
        for (String str : this.fields) {
            hashMap.put(str, requireMap.get(str));
        }
        return (R) r.newRecord(r.topic(), r.kafkaPartition(), null, hashMap, r.valueSchema(), r.value(), r.timestamp());
    }

    private R applyWithSchema(R r) {
        Struct requireStruct = Requirements.requireStruct(r.value(), PURPOSE);
        Schema schema = this.valueToKeySchemaCache.get(requireStruct.schema());
        if (schema == null) {
            SchemaBuilder struct = SchemaBuilder.struct();
            for (String str : this.fields) {
                struct.field(str, requireStruct.schema().field(str).schema());
            }
            schema = struct.build();
            this.valueToKeySchemaCache.put(requireStruct.schema(), schema);
        }
        Struct struct2 = new Struct(schema);
        for (String str2 : this.fields) {
            struct2.put(str2, requireStruct.get(str2));
        }
        return (R) r.newRecord(r.topic(), r.kafkaPartition(), schema, struct2, requireStruct.schema(), requireStruct, r.timestamp());
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.valueToKeySchemaCache = null;
    }
}
