/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.pulsar;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named(value="pulsar")
@Dependent
public class PulsarChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PulsarChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.pulsar.";
    private static final String PROP_CLIENT_PREFIX = "debezium.sink.pulsar.client.";
    private static final String PROP_PRODUCER_PREFIX = "debezium.sink.pulsar.producer.";
    private final Map<String, Producer<?>> producers = new HashMap();
    private PulsarClient pulsarClient;
    private Map<String, Object> producerConfig;
    @ConfigProperty(name="debezium.sink.pulsar.null.key", defaultValue="default")
    String nullKey;
    @ConfigProperty(name="debezium.sink.pulsar.tenant", defaultValue="public")
    String pulsarTenant;
    @ConfigProperty(name="debezium.sink.pulsar.namespace", defaultValue="default")
    String pulsarNamespace;
    @ConfigProperty(name="debezium.sink.pulsar.timeout", defaultValue="0")
    Integer timeout;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        try {
            this.pulsarClient = PulsarClient.builder().loadConf(this.getConfigSubset(config, PROP_CLIENT_PREFIX)).build();
        }
        catch (PulsarClientException e) {
            throw new DebeziumException((Throwable)e);
        }
        this.producerConfig = this.getConfigSubset(config, PROP_PRODUCER_PREFIX);
    }

    @PreDestroy
    void close() {
        this.producers.values().forEach(producer -> {
            try {
                producer.close();
            }
            catch (Exception e) {
                LOGGER.warn("Exception while closing producer", (Throwable)e);
            }
        });
        try {
            this.pulsarClient.close();
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing client", (Throwable)e);
        }
    }

    private Producer<?> createProducer(String topicName, Object value) {
        String topicFullName = this.pulsarTenant + "/" + this.pulsarNamespace + "/" + topicName;
        try {
            if (value instanceof String) {
                return this.pulsarClient.newProducer(Schema.STRING).loadConf(this.producerConfig).topic(topicFullName).create();
            }
            return this.pulsarClient.newProducer().loadConf(this.producerConfig).topic(topicFullName).create();
        }
        catch (PulsarClientException e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        HashMap<String, Producer> batchProducers = new HashMap<String, Producer>();
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received event '{}'", record);
            String topicName = this.streamNameMapper.map(record.destination());
            Producer producer = this.producers.computeIfAbsent(topicName, topic -> this.createProducer((String)topic, record.value()));
            batchProducers.put(topicName, producer);
            String key = record.key() == null ? this.nullKey : this.getString(record.key());
            TypedMessageBuilder message = record.value() instanceof String ? producer.newMessage(Schema.STRING) : producer.newMessage();
            message.properties(this.convertHeaders(record)).key(key).value(record.value());
            message.sendAsync().whenComplete((messageId, exception) -> {
                if (exception == null) {
                    LOGGER.trace("Sent message with id: {}", messageId);
                    try {
                        committer.markProcessed((Object)record);
                    }
                    catch (InterruptedException e) {
                        throw new DebeziumException((Throwable)e);
                    }
                } else {
                    LOGGER.error("Failed to send record to {} destination", (Object)record.destination(), exception);
                }
            });
        }
        CompletableFuture<Void> allProducersCompleted = CompletableFuture.allOf((CompletableFuture[])batchProducers.values().stream().map(Producer::flushAsync).toArray(CompletableFuture[]::new));
        try {
            if (this.timeout > 0) {
                allProducersCompleted.get(this.timeout.intValue(), TimeUnit.MILLISECONDS);
            } else {
                allProducersCompleted.join();
            }
        }
        catch (CompletionException | ExecutionException | TimeoutException exception2) {
            LOGGER.error("Failed to send batch", (Throwable)exception2);
            throw new DebeziumException((Throwable)exception2);
        }
        committer.markBatchFinished();
    }

    public static interface ProducerBuilder {
        public Producer<Object> get(String var1, Object var2);
    }
}

