/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named(value="pubsublite")
@Dependent
public class PubSubLiteChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PubSubLiteChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.pubsublite.";
    private static final String PROP_PROJECT_ID = "debezium.sink.pubsublite.project.id";
    private static final String PROP_REGION = "debezium.sink.pubsublite.region";
    private PublisherBuilder publisherBuilder;
    private final Map<String, Publisher> publishers = new HashMap<String, Publisher>();
    @ConfigProperty(name="debezium.sink.pubsublite.ordering.enabled", defaultValue="true")
    boolean orderingEnabled;
    @ConfigProperty(name="debezium.sink.pubsublite.null.key", defaultValue="default")
    String nullKey;
    @Inject
    @CustomConsumerBuilder
    Instance<PublisherBuilder> customPublisherBuilder;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        String projectId = config.getOptionalValue(PROP_PROJECT_ID, String.class).orElse(ServiceOptions.getDefaultProjectId());
        String region = (String)config.getValue(PROP_REGION, String.class);
        if (this.customPublisherBuilder.isResolvable()) {
            this.publisherBuilder = (PublisherBuilder)this.customPublisherBuilder.get();
            LOGGER.info("Obtained custom configured PublisherBuilder '{}'", this.customPublisherBuilder);
            return;
        }
        this.publisherBuilder = t -> {
            TopicPath topicPath = ((TopicPath.Builder)((TopicPath.Builder)TopicPath.newBuilder().setName(TopicName.of((String)t)).setProject(ProjectId.of((String)projectId))).setLocation(CloudRegionOrZone.parse((String)region))).build();
            PublisherSettings publisherSettings = PublisherSettings.newBuilder().setTopicPath(topicPath).build();
            Publisher publisher = Publisher.create((PublisherSettings)publisherSettings);
            publisher.startAsync().awaitRunning();
            return publisher;
        };
        LOGGER.info("Using default PublisherBuilder '{}'", (Object)this.publisherBuilder);
    }

    @PreDestroy
    void close() {
        this.publishers.values().forEach(publisher -> {
            try {
                publisher.stopAsync().awaitTerminated();
            }
            catch (Exception e) {
                LOGGER.warn("Exception while closing publisher: " + e);
            }
        });
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        List messageIds;
        ArrayList<ApiFuture> deliveries = new ArrayList<ApiFuture>();
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received event '{}'", record);
            String topicName = this.streamNameMapper.map(record.destination());
            Publisher publisher = this.publishers.computeIfAbsent(topicName, topic -> this.publisherBuilder.get((String)topic));
            PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder();
            if (this.orderingEnabled) {
                if (record.key() == null) {
                    pubsubMessage.setOrderingKey(this.nullKey);
                } else if (record.key() instanceof String) {
                    pubsubMessage.setOrderingKey((String)record.key());
                } else if (record.key() instanceof byte[]) {
                    pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[])((byte[])record.key())));
                }
            }
            if (record.value() instanceof String) {
                pubsubMessage.setData(ByteString.copyFromUtf8((String)((String)record.value())));
            } else if (record.value() instanceof byte[]) {
                pubsubMessage.setData(ByteString.copyFrom((byte[])((byte[])record.value())));
            }
            pubsubMessage.putAllAttributes(this.convertHeaders(record));
            deliveries.add(publisher.publish(pubsubMessage.build()));
            committer.markProcessed(record);
        }
        try {
            messageIds = (List)ApiFutures.allAsList(deliveries).get();
        }
        catch (ExecutionException e) {
            throw new DebeziumException((Throwable)e);
        }
        LOGGER.trace("Sent messages with ids: {}", (Object)messageIds);
        committer.markBatchFinished();
    }

    public boolean supportsTombstoneEvents() {
        return false;
    }

    public static interface PublisherBuilder {
        public Publisher get(String var1);
    }
}

