/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.debezium.server.eventhubs.BatchManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.List;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named(value="eventhubs")
@Dependent
public class EventHubsChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.eventhubs.";
    private static final String PROP_CONNECTION_STRING_NAME = "debezium.sink.eventhubs.connectionstring";
    private static final String PROP_EVENTHUB_NAME = "debezium.sink.eventhubs.hubname";
    private static final String PROP_PARTITION_ID = "debezium.sink.eventhubs.partitionid";
    private static final String PROP_PARTITION_KEY = "debezium.sink.eventhubs.partitionkey";
    private static final String PROP_MAX_BATCH_SIZE = "debezium.sink.eventhubs.maxbatchsize";
    private String connectionString;
    private String eventHubName;
    private String configuredPartitionId;
    private String configuredPartitionKey;
    private Integer maxBatchSize;
    private Integer partitionCount;
    private static final String CONNECTION_STRING_FORMAT = "%s;EntityPath=%s";
    private EventHubProducerClient producer = null;
    private BatchManager batchManager = null;
    @Inject
    @CustomConsumerBuilder
    Instance<EventHubProducerClient> customProducer;

    @PostConstruct
    void connect() {
        if (this.customProducer.isResolvable()) {
            this.producer = (EventHubProducerClient)this.customProducer.get();
            LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'", (Object)((EventHubProducerClient)this.customProducer.get()).getFullyQualifiedNamespace());
            return;
        }
        Config config = ConfigProvider.getConfig();
        this.connectionString = (String)config.getValue(PROP_CONNECTION_STRING_NAME, String.class);
        this.eventHubName = (String)config.getValue(PROP_EVENTHUB_NAME, String.class);
        this.maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
        this.configuredPartitionId = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
        this.configuredPartitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");
        String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, this.connectionString, this.eventHubName);
        try {
            this.producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();
            this.batchManager = new BatchManager(this.producer, this.configuredPartitionId, this.configuredPartitionKey, this.maxBatchSize);
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
        LOGGER.info("Using default Event Hubs client for namespace '{}'", (Object)this.producer.getFullyQualifiedNamespace());
        this.partitionCount = (int)this.producer.getPartitionIds().stream().count();
        LOGGER.trace("Event Hub '{}' has {} partitions available", (Object)this.producer.getEventHubName(), (Object)this.partitionCount);
        if (!this.configuredPartitionId.isEmpty() && Integer.parseInt(this.configuredPartitionId) > this.partitionCount - 1) {
            throw new IndexOutOfBoundsException(String.format("Target partition id %s does not exist in target EventHub %s", this.configuredPartitionId, this.eventHubName));
        }
    }

    @PreDestroy
    void close() {
        try {
            this.producer.close();
            LOGGER.info("Closed Event Hubs producer client");
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing Event Hubs producer: {}", (Throwable)e);
        }
    }

    /*
     * Unable to fully structure code
     */
    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        EventHubsChangeConsumer.LOGGER.trace("Event Hubs sink adapter processing change events");
        this.batchManager.initializeBatch(records, committer);
        recordIndex = 0;
        while (recordIndex < records.size()) {
            start = recordIndex;
            EventHubsChangeConsumer.LOGGER.trace("Emitting events starting from index {}", (Object)start);
            while (recordIndex < records.size()) {
                block15: {
                    block16: {
                        record = records.get(recordIndex);
                        if (null == record.value()) break block15;
                        if (!(record.value() instanceof String)) break block16;
                        eventData = new EventData((String)record.value());
                        ** GOTO lbl18
                    }
                    if (!(record.value() instanceof byte[])) {
                        EventHubsChangeConsumer.LOGGER.warn("Event data in record.value() is not of type String or byte[]");
                    } else {
                        eventData = new EventData(this.getBytes(record.value()));
lbl18:
                        // 2 sources

                        if (!this.configuredPartitionId.isEmpty()) {
                            targetPartitionId = Integer.parseInt(this.configuredPartitionId);
                        } else if (!this.configuredPartitionKey.isEmpty()) {
                            targetPartitionId = BatchManager.BATCH_INDEX_FOR_PARTITION_KEY;
                        } else {
                            targetPartitionId = record.partition();
                            if (targetPartitionId == null) {
                                targetPartitionId = BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID;
                            }
                        }
                        if (targetPartitionId < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID || targetPartitionId > this.partitionCount - 1) {
                            throw new IndexOutOfBoundsException(String.format("Target partition id %d does not exist in target EventHub %s", new Object[]{targetPartitionId, this.eventHubName}));
                        }
                        try {
                            this.batchManager.sendEventToPartitionId(eventData, recordIndex, targetPartitionId);
                        }
                        catch (IllegalArgumentException e) {
                            throw new DebeziumException((Throwable)e);
                        }
                        catch (AmqpException e) {
                            throw new DebeziumException("Event data was larger than the maximum size of the batch", (Throwable)e);
                        }
                        catch (Exception e) {
                            throw new DebeziumException((Throwable)e);
                        }
                    }
                }
                ++recordIndex;
            }
        }
        this.batchManager.closeAndEmitBatches();
        EventHubsChangeConsumer.LOGGER.trace("Marking {} records as processed.", (Object)records.size());
        for (ChangeEvent<Object, Object> record : records) {
            committer.markProcessed(record);
        }
        committer.markBatchFinished();
        EventHubsChangeConsumer.LOGGER.trace("Batch marked finished");
    }
}

