/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.connect.common.output;

import io.aiven.kafka.connect.common.config.OutputField;
import io.confluent.connect.avro.AvroData;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SinkSchemaBuilder {
    private final Logger logger = LoggerFactory.getLogger(SinkSchemaBuilder.class);
    private final Collection<OutputField> fields;
    private final AvroData avroData;
    private final boolean envelopeEnabled;

    public SinkSchemaBuilder(Collection<OutputField> fields, AvroData avroData, boolean envelopeEnabled) {
        this.fields = fields;
        this.avroData = avroData;
        this.envelopeEnabled = envelopeEnabled;
    }

    public SinkSchemaBuilder(Collection<OutputField> fields, AvroData avroData) {
        this.fields = fields;
        this.avroData = avroData;
        this.envelopeEnabled = true;
    }

    protected abstract String getNamespace();

    public Schema buildSchema(SinkRecord record) {
        Objects.requireNonNull(record, "record");
        if (Objects.isNull(record.keySchema())) {
            throw new DataException("Record key without schema");
        }
        if (Objects.isNull(record.valueSchema())) {
            throw new DataException("Record value without schema");
        }
        this.logger.debug("Create schema for record");
        this.logger.debug("Record Key Schema {}", (Object)record.keySchema());
        this.logger.debug("Record Value Schema {}", (Object)record.valueSchema());
        return this.avroSchemaFor(record);
    }

    protected Schema avroSchemaFor(SinkRecord record) {
        if (this.envelopeEnabled) {
            SchemaBuilder.FieldAssembler schemaFields = SchemaBuilder.builder((String)this.getNamespace()).record("connector_records").fields();
            for (OutputField f : this.fields) {
                Schema schema = this.outputFieldSchema(f, record);
                schemaFields.name(f.getFieldType().name).type(schema).noDefault();
            }
            return (Schema)schemaFields.endRecord();
        }
        return this.tryUnwrapEnvelope(record);
    }

    private Schema tryUnwrapEnvelope(SinkRecord record) {
        OutputField field = this.getFields().iterator().next();
        Schema schema = this.outputFieldSchema(field, record);
        if (schema.getType() == Schema.Type.MAP) {
            Map value = (Map)record.value();
            SchemaBuilder.FieldAssembler schemaFields = SchemaBuilder.builder((String)this.getNamespace()).record("connector_records").fields();
            for (Map.Entry entry : value.entrySet()) {
                schemaFields.name((String)entry.getKey()).type(schema.getValueType()).noDefault();
            }
            return (Schema)schemaFields.endRecord();
        }
        if (schema.getType() == Schema.Type.RECORD) {
            return this.getAvroData().fromConnectSchema(record.valueSchema());
        }
        return (Schema)SchemaBuilder.builder((String)this.getNamespace()).record("connector_records").fields().name(field.getFieldType().name).type(schema).noDefault().endRecord();
    }

    private Schema headersSchema(SinkRecord record) {
        if (record.headers().isEmpty()) {
            return (Schema)SchemaBuilder.builder().nullType();
        }
        org.apache.kafka.connect.data.Schema headerSchema = null;
        for (Header h : record.headers()) {
            if (Objects.isNull(h.schema())) {
                throw new DataException("Header " + String.valueOf(h) + " without schema");
            }
            if (Objects.isNull(headerSchema)) {
                headerSchema = h.schema();
                continue;
            }
            if (headerSchema.type() == h.schema().type()) continue;
            throw new DataException("Header schema " + String.valueOf(h.schema()) + " is not the same as " + String.valueOf(headerSchema));
        }
        return (Schema)SchemaBuilder.map().values(this.avroData.fromConnectSchema(headerSchema));
    }

    protected Schema outputFieldSchema(OutputField field, SinkRecord record) {
        switch (field.getFieldType()) {
            case KEY: {
                return this.avroData.fromConnectSchema(record.keySchema());
            }
            case OFFSET: 
            case TIMESTAMP: {
                return (Schema)SchemaBuilder.builder().longType();
            }
            case VALUE: {
                return this.avroData.fromConnectSchema(record.valueSchema());
            }
            case HEADERS: {
                return this.headersSchema(record);
            }
        }
        throw new ConnectException("Unknown field type " + String.valueOf(field));
    }

    public Collection<OutputField> getFields() {
        return this.fields;
    }

    public AvroData getAvroData() {
        return this.avroData;
    }

    public boolean isEnvelopeEnabled() {
        return this.envelopeEnabled;
    }
}

