package io.openlineage.spark3.agent.lifecycle.plan.catalog;

import datahub.client.patch.dataset.DatasetPropertiesPatchBuilder;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import io.openlineage.client.OpenLineage;
import io.openlineage.spark.agent.util.DatasetIdentifier;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
import io.openlineage.spark.agent.util.SparkConfUtils;
import io.openlineage.spark.api.OpenLineageContext;
import java.io.File;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;

/* loaded from: input_file:io/openlineage/spark3/agent/lifecycle/plan/catalog/IcebergHandler.class */
public class IcebergHandler implements CatalogHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) IcebergHandler.class);
    private final OpenLineageContext context;
    private static final String TYPE = "type";

    public IcebergHandler(OpenLineageContext openLineageContext) {
        this.context = openLineageContext;
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public boolean hasClasses() {
        try {
            IcebergHandler.class.getClassLoader().loadClass("org.apache.iceberg.catalog.Catalog");
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public boolean isClass(TableCatalog tableCatalog) {
        return (tableCatalog instanceof SparkCatalog) || (tableCatalog instanceof SparkSessionCatalog);
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public DatasetIdentifier getDatasetIdentifier(SparkSession sparkSession, TableCatalog tableCatalog, Identifier identifier, Map<String, String> map) {
        String name = tableCatalog.name();
        String format = String.format("spark.sql.catalog.%s", name);
        Map fromMap = ScalaConversionUtils.fromMap(sparkSession.conf().getAll());
        log.info(fromMap.toString());
        Map map2 = (Map) fromMap.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(format);
        }).filter(entry2 -> {
            return ((String) entry2.getKey()).length() > format.length();
        }).collect(Collectors.toMap(entry3 -> {
            return ((String) entry3.getKey()).substring(format.length() + 1);
        }, (v0) -> {
            return v0.getValue();
        }));
        log.info(map2.toString());
        if (map2.isEmpty() || !map2.containsKey("type")) {
            throw new UnsupportedCatalogException(name);
        }
        log.info((String) map2.get("type"));
        DatasetIdentifier fromPath = PathUtils.fromPath(new Path((String) map2.get("warehouse"), identifier.toString()));
        if (((String) map2.get("type")).equals("hive")) {
            fromPath.withSymlink(getHiveIdentifier(sparkSession, (String) map2.get(DatasetPropertiesPatchBuilder.URI_KEY), identifier.toString()));
        } else if (((String) map2.get("type")).equals("hadoop")) {
            fromPath.withSymlink(identifier.toString(), StringUtils.substringBeforeLast(fromPath.getName(), File.separator), DatasetIdentifier.SymlinkType.TABLE);
        } else if (((String) map2.get("type")).equals("rest")) {
            fromPath.withSymlink(getRestIdentifier(sparkSession, (String) map2.get(DatasetPropertiesPatchBuilder.URI_KEY), identifier.toString()));
        }
        return fromPath;
    }

    private DatasetIdentifier.Symlink getHiveIdentifier(SparkSession sparkSession, @Nullable String str, String str2) {
        DatasetIdentifier fromPath = PathUtils.fromPath(new Path(PathUtils.enrichHiveMetastoreURIWithTableName(str == null ? SparkConfUtils.getMetastoreUri(sparkSession.sparkContext().conf()).orElseThrow(() -> {
            return new UnsupportedCatalogException("hive");
        }) : new URI(str), String.format("/%s", str2))));
        return new DatasetIdentifier.Symlink(fromPath.getName(), fromPath.getNamespace(), DatasetIdentifier.SymlinkType.TABLE);
    }

    private DatasetIdentifier.Symlink getRestIdentifier(SparkSession sparkSession, @Nullable String str, String str2) {
        return new DatasetIdentifier.Symlink(str2, new URI(str).toString(), DatasetIdentifier.SymlinkType.TABLE);
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public Optional<OpenLineage.StorageDatasetFacet> getStorageDatasetFacet(Map<String, String> map) {
        return Optional.of(this.context.getOpenLineage().newStorageDatasetFacet("iceberg", map.getOrDefault("format", "").replace("iceberg/", "")));
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public Optional<String> getDatasetVersion(TableCatalog tableCatalog, Identifier identifier, Map<String, String> map) {
        try {
            SparkTable loadTable = tableCatalog.loadTable(identifier);
            return (loadTable.table() == null || loadTable.table().currentSnapshot() == null) ? Optional.empty() : Optional.of(Long.toString(loadTable.table().currentSnapshot().snapshotId()));
        } catch (NoSuchTableException | ClassCastException e) {
            log.error("Failed to load table from catalog: {}", identifier, e);
            return Optional.empty();
        }
    }

    @Override // io.openlineage.spark3.agent.lifecycle.plan.catalog.CatalogHandler
    public String getName() {
        return "iceberg";
    }
}
