package datahub.spark.consumer.impl;

import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.spark.SparkEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:datahub/spark/consumer/impl/McpEmitter.class */
public class McpEmitter implements LineageConsumer, Closeable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(McpEmitter.class);
    private final Optional<Emitter> emitter;
    private static final String CONF_PREFIX = "spark.datahub.";
    private static final String TRANSPORT_KEY = "transport";
    private static final String GMS_URL_KEY = "rest.server";
    private static final String GMS_AUTH_TOKEN = "rest.token";

    private void emit(List<MetadataChangeProposalWrapper> list) {
        if (this.emitter.isPresent()) {
            ((List) list.stream().map(metadataChangeProposalWrapper -> {
                try {
                    return this.emitter.get().emit(metadataChangeProposalWrapper);
                } catch (IOException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList())).forEach(future -> {
                try {
                    future.get();
                } catch (InterruptedException | ExecutionException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                }
            });
        }
    }

    public McpEmitter() {
        Map map = (Map) Arrays.stream(SparkEnv.get().conf().getAllWithPrefix(CONF_PREFIX)).collect(Collectors.toMap(tuple2 -> {
            return (String) tuple2._1;
        }, tuple22 -> {
            return (String) tuple22._2;
        }));
        String str = (String) map.getOrDefault(TRANSPORT_KEY, "rest");
        if (!str.toLowerCase(Locale.ROOT).equals("rest")) {
            this.emitter = Optional.empty();
            log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", str);
            return;
        }
        String str2 = (String) map.getOrDefault(GMS_URL_KEY, "http://localhost:8080");
        String str3 = (String) map.getOrDefault(GMS_AUTH_TOKEN, null);
        log.info("REST Emitter Configuration: GMS url {}{}", str2, map.containsKey(GMS_URL_KEY) ? "" : "(default)");
        if (str3 != null) {
            log.info("REST Emitter Configuration: Token {}", str3 != null ? "XXXXX" : "(empty)");
        }
        this.emitter = Optional.of(RestEmitter.create(restEmitterConfigBuilder -> {
            restEmitterConfigBuilder.server(str2).token(str3);
        }));
    }

    @Override // java.util.function.Consumer
    public void accept(LineageEvent lineageEvent) {
        emit(lineageEvent.asMetadataEvents());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.emitter.isPresent()) {
            this.emitter.get().close();
        }
    }
}
