package org.apache.kylin.engine.spark.job;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.utils.BuildUtils;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.Metrics;
import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/CubeMergeJob.class */
public class CubeMergeJob extends SparkApplication {
    protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class);
    private BuildLayoutWithUpdate buildLayoutWithUpdate;
    private List<CubeSegment> mergingSegments = Lists.newArrayList();
    private List<SegmentInfo> mergingSegInfos = Lists.newArrayList();

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void doExecute() throws Exception {
        this.buildLayoutWithUpdate = new BuildLayoutWithUpdate();
        String param = getParam("cubeId");
        String param2 = getParam("segmentIds");
        CubeInstance cubeByUuid = CubeManager.getInstance(this.config).getCubeByUuid(param);
        this.mergingSegments = cubeByUuid.getMergingSegments(cubeByUuid.getSegmentById(param2));
        Iterator<CubeSegment> it = this.mergingSegments.iterator();
        while (it.hasNext()) {
            this.mergingSegInfos.add(ManagerHub.getSegmentInfo(this.config, getParam("cubeId"), it.next().getUuid()));
        }
        mergeSegments(param, param2);
    }

    private void mergeSegments(String str, String str2) throws IOException {
        Dataset sortWithinPartitions;
        CubeInstance cubeByUuid = CubeManager.getInstance(this.config).getCubeByUuid(str);
        final SegmentInfo segmentInfo = ManagerHub.getSegmentInfo(this.config, getParam("cubeId"), cubeByUuid.getSegmentById(str2).getUuid());
        for (final CubeMergeAssist cubeMergeAssist : generateMergeAssist(this.mergingSegInfos, this.ss).values()) {
            ForestSpanningTree forestSpanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(segmentInfo.toBuildLayouts()));
            Dataset<Row> merge = cubeMergeAssist.merge(this.config, cubeByUuid.getName());
            final LayoutEntity layout = cubeMergeAssist.getLayout();
            if (layout.isTableIndex()) {
                sortWithinPartitions = merge.sortWithinPartitions(new Column[]{NSparkCubingUtil.getFirstColumn(layout.getOrderedDimensions().keySet())});
            } else {
                Set keySet = layout.getOrderedDimensions().keySet();
                sortWithinPartitions = CuboidAggregator.agg(this.ss, merge, keySet, layout.getOrderedMeasures(), forestSpanningTree, false).sortWithinPartitions(new Column[]{NSparkCubingUtil.getFirstColumn(keySet)});
            }
            final Dataset dataset = sortWithinPartitions;
            this.buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { // from class: org.apache.kylin.engine.spark.job.CubeMergeJob.1
                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public String getName() {
                    return "merge-cuboid-" + layout.getId();
                }

                @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                public LayoutEntity build() throws IOException {
                    return CubeMergeJob.this.saveAndUpdateCuboid(dataset, segmentInfo, layout, cubeMergeAssist);
                }
            }, this.config);
            this.buildLayoutWithUpdate.updateLayout(segmentInfo, this.config);
        }
    }

    public static Map<Long, CubeMergeAssist> generateMergeAssist(List<SegmentInfo> list, SparkSession sparkSession) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        for (SegmentInfo segmentInfo : list) {
            scala.collection.immutable.List layouts = segmentInfo.layouts();
            for (int i = 0; i < layouts.size(); i++) {
                LayoutEntity layoutEntity = (LayoutEntity) layouts.apply(i);
                long id = layoutEntity.getId();
                CubeMergeAssist cubeMergeAssist = (CubeMergeAssist) newConcurrentMap.get(Long.valueOf(id));
                if (cubeMergeAssist == null) {
                    CubeMergeAssist cubeMergeAssist2 = new CubeMergeAssist();
                    cubeMergeAssist2.addCuboid(layoutEntity);
                    cubeMergeAssist2.setSs(sparkSession);
                    cubeMergeAssist2.setLayout(layoutEntity);
                    cubeMergeAssist2.setNewSegment(segmentInfo);
                    cubeMergeAssist2.setToMergeSegments(list);
                    newConcurrentMap.put(Long.valueOf(id), cubeMergeAssist2);
                } else {
                    cubeMergeAssist.addCuboid(layoutEntity);
                }
            }
        }
        return newConcurrentMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LayoutEntity saveAndUpdateCuboid(Dataset<Row> dataset, SegmentInfo segmentInfo, LayoutEntity layoutEntity, CubeMergeAssist cubeMergeAssist) throws IOException {
        long id = layoutEntity.getId();
        long j = 0;
        Iterator<LayoutEntity> it = cubeMergeAssist.getCuboids().iterator();
        while (it.hasNext()) {
            j += it.next().getSourceRows();
        }
        String uuid = UUID.randomUUID().toString();
        this.ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), uuid);
        this.ss.sparkContext().setJobDescription("merge layout " + id);
        NSparkCubingEngine.NSparkCubingStorage nSparkCubingStorage = (NSparkCubingEngine.NSparkCubingStorage) StorageFactory.createEngineAdapter(layoutEntity, NSparkCubingEngine.NSparkCubingStorage.class);
        String parquetStoragePath = PathManager.getParquetStoragePath(this.config, getParam("cubeName"), segmentInfo.name(), segmentInfo.identifier(), String.valueOf(id));
        String str = parquetStoragePath + CubeBuildJob.TEMP_DIR_SUFFIX;
        nSparkCubingStorage.saveTo(str, dataset, this.ss);
        long metrics = JobMetricsUtils.collectMetrics(uuid).getMetrics(Metrics.CUBOID_ROWS_CNT());
        if (metrics == -1) {
            this.infos.recordAbnormalLayouts(layoutEntity.getId(), "'Job metrics seems null, use count() to collect cuboid rows.'");
            logger.warn("Can not get cuboid row cnt, use count() to collect cuboid rows.");
            layoutEntity.setRows(dataset.count());
        } else {
            layoutEntity.setRows(metrics);
        }
        layoutEntity.setSourceRows(j);
        layoutEntity.setShardNum(BuildUtils.repartitionIfNeed(layoutEntity, nSparkCubingStorage, parquetStoragePath, str, this.config, this.ss));
        this.ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), (String) null);
        this.ss.sparkContext().setJobDescription((String) null);
        QueryExecutionCache.removeQueryExecution(uuid);
        BuildUtils.fillCuboidInfo(layoutEntity, parquetStoragePath);
        return layoutEntity;
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String generateInfo() {
        return LogJobInfoUtils.dfMergeJobInfo();
    }

    public static void main(String[] strArr) {
        new CubeMergeJob().execute(strArr);
    }
}
