package org.apache.hudi.hadoop.realtime;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.class */
public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader<NullWritable, ArrayWritable> {
    private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
    protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
    private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
    private final Set<String> deltaRecordKeys;
    private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
    private final int recordKeyIndex;
    private Iterator<String> deltaItr;

    public RealtimeCompactedRecordReader(RealtimeSplit realtimeSplit, JobConf jobConf, RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException {
        super(realtimeSplit, jobConf);
        this.parquetReader = recordReader;
        this.mergedLogRecordScanner = getMergedLogRecordScanner();
        this.deltaRecordMap = this.mergedLogRecordScanner.getRecords();
        this.deltaRecordKeys = new HashSet(this.deltaRecordMap.keySet());
        this.recordKeyIndex = ((Integer) realtimeSplit.getVirtualKeyInfo().map((v0) -> {
            return v0.getRecordKeyFieldIndex();
        }).orElse(2)).intValue();
    }

    private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOException {
        return HoodieMergedLogRecordScanner.newBuilder().withFileSystem(FSUtils.getFs(this.split.getPath().toString(), (Configuration) this.jobConf)).withBasePath(this.split.getBasePath()).withLogFilePaths(this.split.getDeltaLogPaths()).withReaderSchema(this.usesCustomPayload ? getWriterSchema() : getReaderSchema()).withLatestInstantTime(this.split.getMaxCommitTime()).withMaxMemorySizeInBytes(Long.valueOf(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(this.jobConf))).withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, "true"))).withReverseReader(false).withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 1048576)).withSpillableMapBasePath(this.jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)).withDiskMapType((ExternalSpillableMap.DiskMapType) this.jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())).withBitCaskDiskMapCompressionEnabled(this.jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().booleanValue())).build();
    }

    private Option<GenericRecord> buildGenericRecordwithCustomPayload(HoodieRecord hoodieRecord) throws IOException {
        return this.usesCustomPayload ? ((HoodieAvroRecord) hoodieRecord).getData().getInsertValue(getWriterSchema(), this.payloadProps) : ((HoodieAvroRecord) hoodieRecord).getData().getInsertValue(getReaderSchema(), this.payloadProps);
    }

    public boolean next(NullWritable nullWritable, ArrayWritable arrayWritable) throws IOException {
        while (this.parquetReader.next(nullWritable, arrayWritable)) {
            if (this.deltaRecordMap.isEmpty()) {
                return true;
            }
            String obj = arrayWritable.get()[this.recordKeyIndex].toString();
            if (!this.deltaRecordMap.containsKey(obj)) {
                return true;
            }
            this.deltaRecordKeys.remove(obj);
            Option<GenericRecord> buildGenericRecordwithCustomPayload = buildGenericRecordwithCustomPayload(this.deltaRecordMap.get(obj));
            if (buildGenericRecordwithCustomPayload.isPresent()) {
                setUpWritable(buildGenericRecordwithCustomPayload, arrayWritable, obj);
                return true;
            }
        }
        if (this.deltaItr == null) {
            this.deltaItr = this.deltaRecordKeys.iterator();
        }
        while (this.deltaItr.hasNext()) {
            String next = this.deltaItr.next();
            Option<GenericRecord> buildGenericRecordwithCustomPayload2 = buildGenericRecordwithCustomPayload(this.deltaRecordMap.get(next));
            if (buildGenericRecordwithCustomPayload2.isPresent()) {
                setUpWritable(buildGenericRecordwithCustomPayload2, arrayWritable, next);
                return true;
            }
        }
        return false;
    }

    private void setUpWritable(Option<GenericRecord> option, ArrayWritable arrayWritable, String str) {
        GenericRecord genericRecord = option.get();
        if (this.usesCustomPayload) {
            genericRecord = HoodieAvroUtils.rewriteRecord(option.get(), getReaderSchema());
        }
        ArrayWritable avroToArrayWritable = HoodieRealtimeRecordReaderUtils.avroToArrayWritable(genericRecord, getHiveSchema());
        Writable[] writableArr = avroToArrayWritable.get();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("key %s, base values: %s, log values: %s", str, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), HoodieRealtimeRecordReaderUtils.arrayWritableToString(avroToArrayWritable)));
        }
        Writable[] writableArr2 = arrayWritable.get();
        try {
            System.arraycopy(writableArr, 0, writableArr2, 0, Math.min(writableArr2.length, writableArr.length));
            arrayWritable.set(writableArr2);
        } catch (RuntimeException e) {
            LOG.error("Got exception when doing array copy", e);
            LOG.error("Base record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable));
            LOG.error("Log record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(avroToArrayWritable));
            throw new RuntimeException("Base-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable) + " ,Log-record :" + HoodieRealtimeRecordReaderUtils.arrayWritableToString(avroToArrayWritable) + " ,Error :" + e.getMessage(), e);
        }
    }

    /* renamed from: createKey, reason: merged with bridge method [inline-methods] */
    public NullWritable m2238createKey() {
        return (NullWritable) this.parquetReader.createKey();
    }

    /* renamed from: createValue, reason: merged with bridge method [inline-methods] */
    public ArrayWritable m2237createValue() {
        return (ArrayWritable) this.parquetReader.createValue();
    }

    public long getPos() throws IOException {
        return this.parquetReader.getPos();
    }

    public void close() throws IOException {
        this.parquetReader.close();
        this.mergedLogRecordScanner.close();
    }

    public float getProgress() throws IOException {
        return this.parquetReader.getProgress();
    }
}
