/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineHeader;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.spi.OffsetCommitPolicy;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;

public class ConvertingEngineBuilder<R>
implements DebeziumEngine.Builder<R> {
    private static final String CONVERTER_PREFIX = "converter";
    private static final String HEADER_CONVERTER_PREFIX = "header.converter";
    private static final String KEY_CONVERTER_PREFIX = "key.converter";
    private static final String VALUE_CONVERTER_PREFIX = "value.converter";
    private static final String FIELD_CLASS = "class";
    private static final String TOPIC_NAME = "debezium";
    private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
    private final DebeziumEngine.Builder<SourceRecord> delegate = new EmbeddedEngine.EngineBuilder();
    private final Class<? extends SerializationFormat<?>> formatHeader;
    private final Class<? extends SerializationFormat<?>> formatKey;
    private final Class<? extends SerializationFormat<?>> formatValue;
    private Configuration config;
    private Function<SourceRecord, R> toFormat;
    private Function<R, SourceRecord> fromFormat;

    ConvertingEngineBuilder(ChangeEventFormat<?> format) {
        this(KeyValueHeaderChangeEventFormat.of(null, (Class)format.getValueFormat(), null));
    }

    ConvertingEngineBuilder(KeyValueChangeEventFormat<?, ?> format) {
        this(format instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat)format : KeyValueHeaderChangeEventFormat.of((Class)format.getKeyFormat(), (Class)format.getValueFormat(), null));
    }

    ConvertingEngineBuilder(KeyValueHeaderChangeEventFormat<?, ?, ?> format) {
        this.formatKey = format.getKeyFormat();
        this.formatValue = format.getValueFormat();
        this.formatHeader = format.getHeaderFormat();
    }

    public DebeziumEngine.Builder<R> notifying(Consumer<R> consumer) {
        this.delegate.notifying(record -> consumer.accept(this.toFormat.apply((SourceRecord)record)));
        return this;
    }

    private static boolean isFormat(Class<? extends SerializationFormat<?>> format1, Class<? extends SerializationFormat<?>> format2) {
        return format1 == format2;
    }

    public DebeziumEngine.Builder<R> notifying(DebeziumEngine.ChangeConsumer<R> handler) {
        this.delegate.notifying((records, committer) -> handler.handleBatch(records.stream().map(x -> this.toFormat.apply((SourceRecord)x)).collect(Collectors.toList()), new DebeziumEngine.RecordCommitter<R>(){

            public void markProcessed(R record) throws InterruptedException {
                committer.markProcessed((Object)ConvertingEngineBuilder.this.fromFormat.apply(record));
            }

            public void markBatchFinished() throws InterruptedException {
                committer.markBatchFinished();
            }

            public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
                committer.markProcessed((Object)ConvertingEngineBuilder.this.fromFormat.apply(record), sourceOffsets);
            }

            public DebeziumEngine.Offsets buildOffsets() {
                return committer.buildOffsets();
            }
        }));
        return this;
    }

    public DebeziumEngine.Builder<R> using(Properties config) {
        this.config = Configuration.from((Properties)config);
        this.delegate.using(config);
        return this;
    }

    public DebeziumEngine.Builder<R> using(ClassLoader classLoader) {
        this.delegate.using(classLoader);
        return this;
    }

    public DebeziumEngine.Builder<R> using(Clock clock) {
        this.delegate.using(clock);
        return this;
    }

    public DebeziumEngine.Builder<R> using(DebeziumEngine.CompletionCallback completionCallback) {
        this.delegate.using(completionCallback);
        return this;
    }

    public DebeziumEngine.Builder<R> using(DebeziumEngine.ConnectorCallback connectorCallback) {
        this.delegate.using(connectorCallback);
        return this;
    }

    public DebeziumEngine.Builder<R> using(OffsetCommitPolicy policy) {
        this.delegate.using(policy);
        return this;
    }

    public DebeziumEngine<R> build() {
        HeaderConverter headerConverter;
        final DebeziumEngine engine = this.delegate.build();
        if (this.formatValue == Connect.class) {
            headerConverter = null;
            this.toFormat = record -> new EmbeddedEngineChangeEvent(null, (SourceRecord)record, StreamSupport.stream(record.headers().spliterator(), false).map(EmbeddedEngineHeader::new).collect(Collectors.toList()), (SourceRecord)record);
        } else {
            Converter keyConverter = this.createConverter(this.formatKey, true);
            Converter valueConverter = this.createConverter(this.formatValue, false);
            headerConverter = this.formatHeader == null ? null : this.createHeaderConverter(this.formatHeader);
            this.toFormat = record -> {
                String topicName = record.topic();
                if (topicName == null) {
                    topicName = TOPIC_NAME;
                }
                byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key());
                byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value());
                List<Object> headers = Collections.emptyList();
                if (headerConverter != null) {
                    List<io.debezium.engine.Header<byte[]>> byteArrayHeaders = this.convertHeaders((SourceRecord)record, topicName, headerConverter);
                    headers = byteArrayHeaders;
                    if (this.shouldConvertHeadersToString()) {
                        headers = byteArrayHeaders.stream().map(h -> new EmbeddedEngineHeader<String>(h.getKey(), new String((byte[])h.getValue(), StandardCharsets.UTF_8))).collect(Collectors.toList());
                    }
                }
                return this.shouldConvertKeyAndValueToString() ? new EmbeddedEngineChangeEvent(key != null ? new String(key, StandardCharsets.UTF_8) : null, value != null ? new String(value, StandardCharsets.UTF_8) : null, headers, (SourceRecord)record) : new EmbeddedEngineChangeEvent(key, value, headers, (SourceRecord)record);
            };
        }
        this.fromFormat = record -> ((EmbeddedEngineChangeEvent)record).sourceRecord();
        final HeaderConverter finalHeaderConverter = headerConverter;
        return new DebeziumEngine<R>(){

            public void run() {
                engine.run();
            }

            public void close() throws IOException {
                if (finalHeaderConverter != null) {
                    finalHeaderConverter.close();
                }
                engine.close();
            }
        };
    }

    private boolean shouldConvertKeyAndValueToString() {
        return ConvertingEngineBuilder.isFormat(this.formatKey, Json.class) && ConvertingEngineBuilder.isFormat(this.formatValue, Json.class) || ConvertingEngineBuilder.isFormat(this.formatValue, CloudEvents.class);
    }

    private boolean shouldConvertHeadersToString() {
        return ConvertingEngineBuilder.isFormat(this.formatHeader, Json.class);
    }

    private List<io.debezium.engine.Header<byte[]>> convertHeaders(SourceRecord record, String topicName, HeaderConverter headerConverter) {
        ArrayList<io.debezium.engine.Header<byte[]>> headers = new ArrayList<io.debezium.engine.Header<byte[]>>();
        for (Header header : record.headers()) {
            String headerKey = header.key();
            byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value());
            headers.add(new EmbeddedEngineHeader<byte[]>(headerKey, rawHeader));
        }
        return headers;
    }

    private HeaderConverter createHeaderConverter(Class<? extends SerializationFormat<?>> format) {
        Configuration converterConfig = this.config.subset(HEADER_CONVERTER_PREFIX, true);
        Configuration commonConverterConfig = this.config.subset(CONVERTER_PREFIX, true);
        converterConfig = ((Configuration.Builder)commonConverterConfig.edit().with(converterConfig)).with("converter.type", "header").build();
        if (!ConvertingEngineBuilder.isFormat(format, Json.class) && !ConvertingEngineBuilder.isFormat(format, JsonByteArray.class)) {
            throw new DebeziumException("Header Converter '" + format.getSimpleName() + "' is not supported");
        }
        converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        HeaderConverter converter = (HeaderConverter)converterConfig.getInstance(FIELD_CLASS, HeaderConverter.class);
        converter.configure(converterConfig.asMap());
        return converter;
    }

    private Converter createConverter(Class<? extends SerializationFormat<?>> format, boolean key) {
        Configuration converterConfig = this.config.subset(key ? KEY_CONVERTER_PREFIX : VALUE_CONVERTER_PREFIX, true);
        Configuration commonConverterConfig = this.config.subset(CONVERTER_PREFIX, true);
        converterConfig = ((Configuration.Builder)commonConverterConfig.edit().with(converterConfig)).build();
        if (ConvertingEngineBuilder.isFormat(format, Json.class) || ConvertingEngineBuilder.isFormat(format, JsonByteArray.class)) {
            converterConfig = converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build() : converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
        } else if (ConvertingEngineBuilder.isFormat(format, CloudEvents.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.debezium.converters.CloudEventsConverter").build();
        } else if (ConvertingEngineBuilder.isFormat(format, Avro.class)) {
            converterConfig = converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG) ? converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.AvroConverter").build() : converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.avro.AvroConverter").build();
            converterConfig = ((Configuration.Builder)converterConfig.edit().withDefault(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, (Object)CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO)).build();
        } else if (ConvertingEngineBuilder.isFormat(format, Protobuf.class)) {
            converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.protobuf.ProtobufConverter").build();
        } else {
            throw new DebeziumException("Converter '" + format.getSimpleName() + "' is not supported");
        }
        Converter converter = (Converter)converterConfig.getInstance(FIELD_CLASS, Converter.class);
        converter.configure(converterConfig.asMap(), key);
        return converter;
    }
}

