package datahub.spark.consumer.impl;

import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.spark2.shaded.typesafe.config.Config;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:datahub/spark/consumer/impl/McpEmitter.class */
public class McpEmitter implements LineageConsumer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(McpEmitter.class);
    private final Optional<Emitter> emitter;
    private static final String TRANSPORT_KEY = "transport";
    private static final String GMS_URL_KEY = "rest.server";
    private static final String GMS_AUTH_TOKEN = "rest.token";

    private void emit(List<MetadataChangeProposalWrapper> list) {
        if (this.emitter.isPresent()) {
            ((List) list.stream().map(metadataChangeProposalWrapper -> {
                try {
                    log.debug("emitting mcpw: " + metadataChangeProposalWrapper);
                    return this.emitter.get().emit(metadataChangeProposalWrapper);
                } catch (IOException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList())).forEach(future -> {
                try {
                    log.info(((MetadataWriteResponse) future.get()).toString());
                } catch (InterruptedException | ExecutionException e) {
                    log.error("Failed to emit metadata to DataHub", e);
                }
            });
        }
    }

    public McpEmitter(Config config) {
        String string = config.hasPath(TRANSPORT_KEY) ? config.getString(TRANSPORT_KEY) : "rest";
        boolean z = -1;
        switch (string.hashCode()) {
            case 3496916:
                if (string.equals("rest")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                String string2 = config.hasPath(GMS_URL_KEY) ? config.getString(GMS_URL_KEY) : "http://localhost:8080";
                String string3 = config.hasPath(GMS_AUTH_TOKEN) ? config.getString(GMS_AUTH_TOKEN) : null;
                log.info("REST Emitter Configuration: GMS url {}{}", string2, config.hasPath(GMS_URL_KEY) ? "" : "(default)");
                if (string3 != null) {
                    log.info("REST Emitter Configuration: Token {}", string3 != null ? "XXXXX" : "(empty)");
                }
                this.emitter = Optional.of(RestEmitter.create(restEmitterConfigBuilder -> {
                    restEmitterConfigBuilder.server(string2).token(string3);
                }));
                return;
            default:
                this.emitter = Optional.empty();
                log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", string);
                return;
        }
    }

    @Override // java.util.function.Consumer
    public void accept(LineageEvent lineageEvent) {
        emit(lineageEvent.asMetadataEvents());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.emitter.isPresent()) {
            this.emitter.get().close();
        }
    }
}
