package io.debezium.transforms;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.util.BoundedConcurrentHashMap;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.TimestampConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.0.0.Final.jar:io/debezium/transforms/ExtractNewRecordState.class */
public class ExtractNewRecordState<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final String PURPOSE = "source field insertion";
    private static final int SCHEMA_CACHE_SIZE = 64;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ExtractNewRecordState.class);
    private boolean dropTombstones;
    private ExtractNewRecordStateConfigDefinition.DeleteHandling handleDeletes;
    private boolean addOperationHeader;
    private String[] addSourceFields;
    private final ExtractField<R> afterDelegate = new ExtractField.Value();
    private final ExtractField<R> beforeDelegate = new ExtractField.Value();
    private final InsertField<R> removedDelegate = new InsertField.Value();
    private final InsertField<R> updatedDelegate = new InsertField.Value();
    private BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache;
    private SmtManager<R> smtManager;

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        Configuration from = Configuration.from(map);
        this.smtManager = new SmtManager<>(from);
        Field.Set of = Field.setOf(ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES, ExtractNewRecordStateConfigDefinition.HANDLE_DELETES);
        Logger logger = LOGGER;
        logger.getClass();
        if (!from.validateAndRecord(of, logger::error)) {
            throw new ConnectException("Unable to validate config.");
        }
        this.dropTombstones = from.getBoolean(ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES);
        this.handleDeletes = ExtractNewRecordStateConfigDefinition.DeleteHandling.parse(from.getString(ExtractNewRecordStateConfigDefinition.HANDLE_DELETES));
        this.addOperationHeader = from.getBoolean(ExtractNewRecordStateConfigDefinition.OPERATION_HEADER);
        this.addSourceFields = from.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS).isEmpty() ? null : from.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS).split(",");
        HashMap hashMap = new HashMap();
        hashMap.put(TimestampConverter.FIELD_CONFIG, Envelope.FieldName.BEFORE);
        this.beforeDelegate.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TimestampConverter.FIELD_CONFIG, Envelope.FieldName.AFTER);
        this.afterDelegate.configure(hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(InsertField.ConfigName.STATIC_FIELD, ExtractNewRecordStateConfigDefinition.DELETED_FIELD);
        hashMap3.put(InsertField.ConfigName.STATIC_VALUE, "true");
        this.removedDelegate.configure(hashMap3);
        HashMap hashMap4 = new HashMap();
        hashMap4.put(InsertField.ConfigName.STATIC_FIELD, ExtractNewRecordStateConfigDefinition.DELETED_FIELD);
        hashMap4.put(InsertField.ConfigName.STATIC_VALUE, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED);
        this.updatedDelegate.configure(hashMap4);
        this.schemaUpdateCache = new BoundedConcurrentHashMap<>(64);
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(R r) {
        if (r.value() == null) {
            if (this.dropTombstones) {
                LOGGER.trace("Tombstone {} arrived and requested to be dropped", r.key());
                return null;
            }
            Envelope.Operation operation = Envelope.Operation.DELETE;
            if (this.addOperationHeader) {
                r.headers().addString(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY, operation.toString());
            }
            return r;
        }
        if (!this.smtManager.isValidEnvelope(r)) {
            return r;
        }
        if (this.addOperationHeader) {
            String string = ((Struct) r.value()).getString(Envelope.FieldName.OPERATION);
            Envelope.Operation forCode = Envelope.Operation.forCode(string);
            if (string.isEmpty() || forCode == null) {
                LOGGER.warn("Unknown operation thus unable to add the operation header into the message");
            } else {
                r.headers().addString(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY, forCode.code());
            }
        }
        R apply = this.afterDelegate.apply(r);
        if (apply.value() != null) {
            R addSourceFields = addSourceFields(this.addSourceFields, r, apply);
            switch (this.handleDeletes) {
                case REWRITE:
                    LOGGER.trace("Insert/update message {} requested to be rewritten", r.key());
                    return (R) this.updatedDelegate.apply(addSourceFields);
                default:
                    return addSourceFields;
            }
        }
        switch (this.handleDeletes) {
            case DROP:
                LOGGER.trace("Delete message {} requested to be dropped", r.key());
                return null;
            case REWRITE:
                LOGGER.trace("Delete message {} requested to be rewritten", r.key());
                return (R) this.removedDelegate.apply(addSourceFields(this.addSourceFields, r, this.beforeDelegate.apply(r)));
            default:
                return apply;
        }
    }

    private R addSourceFields(String[] strArr, R r, R r2) {
        if (strArr == null) {
            return r2;
        }
        Struct requireStruct = Requirements.requireStruct(r2.value(), PURPOSE);
        Struct struct = ((Struct) r.value()).getStruct("source");
        Schema computeIfAbsent = this.schemaUpdateCache.computeIfAbsent(requireStruct.schema(), schema -> {
            return makeUpdatedSchema(schema, struct.schema(), strArr);
        });
        Struct struct2 = new Struct(computeIfAbsent);
        for (org.apache.kafka.connect.data.Field field : requireStruct.schema().fields()) {
            struct2.put(field.name(), requireStruct.get(field));
        }
        for (String str : strArr) {
            struct2.put(ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + str, struct.get(str));
        }
        return (R) r2.newRecord(r2.topic(), r2.kafkaPartition(), r2.keySchema(), r2.key(), computeIfAbsent, struct2, r2.timestamp());
    }

    private Schema makeUpdatedSchema(Schema schema, Schema schema2, String[] strArr) {
        SchemaBuilder copySchemaBasics = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
        for (org.apache.kafka.connect.data.Field field : schema.fields()) {
            copySchemaBasics.field(field.name(), field.schema());
        }
        for (String str : strArr) {
            if (schema2.field(str) == null) {
                throw new ConfigException("Source field specified in 'add.source.fields' does not exist: " + str);
            }
            copySchemaBasics.field(ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + str, schema2.field(str).schema());
        }
        return copySchemaBasics.build();
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        ConfigDef configDef = new ConfigDef();
        Field.group(configDef, null, ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES, ExtractNewRecordStateConfigDefinition.HANDLE_DELETES, ExtractNewRecordStateConfigDefinition.OPERATION_HEADER);
        return configDef;
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.beforeDelegate.close();
        this.afterDelegate.close();
        this.removedDelegate.close();
        this.updatedDelegate.close();
    }
}
