/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named(value="eventhubs")
@Dependent
public class EventHubsChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.eventhubs.";
    private static final String PROP_CONNECTION_STRING_NAME = "debezium.sink.eventhubs.connectionstring";
    private static final String PROP_EVENTHUB_NAME = "debezium.sink.eventhubs.hubname";
    private static final String PROP_PARTITION_ID = "debezium.sink.eventhubs.partitionid";
    private static final String PROP_PARTITION_KEY = "debezium.sink.eventhubs.partitionkey";
    private static final String PROP_MAX_BATCH_SIZE = "debezium.sink.eventhubs.maxbatchsize";
    private String connectionString;
    private String eventHubName;
    private String partitionID;
    private String partitionKey;
    private Integer maxBatchSize;
    private static final String CONNECTION_STRING_FORMAT = "%s;EntityPath=%s";
    private EventHubProducerClient producer = null;
    @Inject
    @CustomConsumerBuilder
    Instance<EventHubProducerClient> customProducer;

    @PostConstruct
    void connect() {
        if (this.customProducer.isResolvable()) {
            this.producer = (EventHubProducerClient)this.customProducer.get();
            LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'", (Object)((EventHubProducerClient)this.customProducer.get()).getFullyQualifiedNamespace());
            return;
        }
        Config config = ConfigProvider.getConfig();
        this.connectionString = (String)config.getValue(PROP_CONNECTION_STRING_NAME, String.class);
        this.eventHubName = (String)config.getValue(PROP_EVENTHUB_NAME, String.class);
        this.partitionID = config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
        this.partitionKey = config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");
        this.maxBatchSize = config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
        String finalConnectionString = String.format(CONNECTION_STRING_FORMAT, this.connectionString, this.eventHubName);
        try {
            this.producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
        LOGGER.info("Using default Event Hubs client for namespace '{}'", (Object)this.producer.getFullyQualifiedNamespace());
    }

    @PreDestroy
    void close() {
        try {
            this.producer.close();
            LOGGER.info("Closed Event Hubs producer client");
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing Event Hubs producer: {}", (Throwable)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        LOGGER.trace("Event Hubs sink adapter processing change events");
        CreateBatchOptions op = new CreateBatchOptions().setPartitionId(this.partitionID);
        if (this.partitionKey != "") {
            op.setPartitionKey(this.partitionKey);
        }
        if (this.maxBatchSize != 0) {
            op.setMaximumSizeInBytes(this.maxBatchSize.intValue());
        }
        EventDataBatch batch = this.producer.createBatch(op);
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received record '{}'", record.value());
            if (null == record.value()) continue;
            EventData eventData = null;
            if (record.value() instanceof String) {
                eventData = new EventData((String)record.value());
            } else if (record.value() instanceof byte[]) {
                eventData = new EventData(this.getBytes(record.value()));
            }
            try {
                if (batch.tryAdd(eventData)) continue;
                throw new DebeziumException("Event data was too large to fit in the batch");
            }
            catch (IllegalArgumentException e) {
                throw new DebeziumException((Throwable)e);
            }
            catch (AmqpException e) {
                throw new DebeziumException("Event data was larger than the maximum size of the batch", (Throwable)e);
            }
            catch (Exception e) {
                throw new DebeziumException((Throwable)e);
            }
        }
        try {
            this.producer.send(batch);
            LOGGER.trace("Sent record batch to Event Hubs");
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
        for (ChangeEvent<Object, Object> record : records) {
            try {
                committer.markProcessed(record);
                LOGGER.trace("Record marked processed");
            }
            catch (Exception e) {
                throw new DebeziumException((Throwable)e);
            }
        }
        committer.markBatchFinished();
        LOGGER.trace("Batch marked finished");
    }
}

