/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.util.collection.unsafe.sort;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import javax.annotation.Nullable;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.Utils;
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
import org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter;
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator;
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger;
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class UnsafeExternalSorter
extends MemoryConsumer {
    private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
    private final PrefixComparator prefixComparator;
    private final RecordComparator recordComparator;
    private final TaskMemoryManager taskMemoryManager;
    private final BlockManager blockManager;
    private final TaskContext taskContext;
    private ShuffleWriteMetrics writeMetrics;
    private final int fileBufferSizeBytes;
    private final LinkedList<MemoryBlock> allocatedPages = new LinkedList();
    private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList();
    @Nullable
    private volatile UnsafeInMemorySorter inMemSorter;
    private MemoryBlock currentPage = null;
    private long pageCursor = -1L;
    private long peakMemoryUsedBytes = 0L;
    private volatile SpillableIterator readingIterator = null;

    public static UnsafeExternalSorter createWithExistingInMemorySorter(TaskMemoryManager taskMemoryManager, BlockManager blockManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, UnsafeInMemorySorter inMemorySorter) {
        return new UnsafeExternalSorter(taskMemoryManager, blockManager, taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
    }

    public static UnsafeExternalSorter create(TaskMemoryManager taskMemoryManager, BlockManager blockManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes) {
        return new UnsafeExternalSorter(taskMemoryManager, blockManager, taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null);
    }

    private UnsafeExternalSorter(TaskMemoryManager taskMemoryManager, BlockManager blockManager, TaskContext taskContext, RecordComparator recordComparator, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, @Nullable UnsafeInMemorySorter existingInMemorySorter) {
        super(taskMemoryManager, pageSizeBytes);
        this.taskMemoryManager = taskMemoryManager;
        this.blockManager = blockManager;
        this.taskContext = taskContext;
        this.recordComparator = recordComparator;
        this.prefixComparator = prefixComparator;
        this.fileBufferSizeBytes = 32768;
        this.writeMetrics = new ShuffleWriteMetrics();
        if (existingInMemorySorter == null) {
            this.inMemSorter = new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
            this.acquireMemory(this.inMemSorter.getMemoryUsage());
        } else {
            this.inMemSorter = existingInMemorySorter;
        }
        this.peakMemoryUsedBytes = this.getMemoryUsage();
        taskContext.addTaskCompletionListener(new TaskCompletionListener(){

            @Override
            public void onTaskCompletion(TaskContext context) {
                UnsafeExternalSorter.this.cleanupResources();
            }
        });
    }

    @VisibleForTesting
    public void closeCurrentPage() {
        if (this.currentPage != null) {
            this.pageCursor = this.currentPage.getBaseOffset() + this.currentPage.size();
        }
    }

    @Override
    public long spill(long size, MemoryConsumer trigger) throws IOException {
        if (trigger != this) {
            if (this.readingIterator != null) {
                return this.readingIterator.spill();
            }
            return 0L;
        }
        if (this.inMemSorter == null || this.inMemSorter.numRecords() <= 0) {
            return 0L;
        }
        this.logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", new Object[]{Thread.currentThread().getId(), Utils.bytesToString(this.getMemoryUsage()), this.spillWriters.size(), this.spillWriters.size() > 1 ? " times" : " time"});
        if (this.inMemSorter.numRecords() > 0) {
            UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(this.blockManager, this.fileBufferSizeBytes, this.writeMetrics, this.inMemSorter.numRecords());
            this.spillWriters.add(spillWriter);
            UnsafeInMemorySorter.SortedIterator sortedRecords = this.inMemSorter.getSortedIterator();
            while (((UnsafeSorterIterator)sortedRecords).hasNext()) {
                ((UnsafeSorterIterator)sortedRecords).loadNext();
                Object baseObject = ((UnsafeSorterIterator)sortedRecords).getBaseObject();
                long baseOffset = ((UnsafeSorterIterator)sortedRecords).getBaseOffset();
                int recordLength = ((UnsafeSorterIterator)sortedRecords).getRecordLength();
                spillWriter.write(baseObject, baseOffset, recordLength, ((UnsafeSorterIterator)sortedRecords).getKeyPrefix());
            }
            spillWriter.close();
            this.inMemSorter.reset();
        }
        long spillSize = this.freeMemory();
        this.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
        return spillSize;
    }

    private long getMemoryUsage() {
        long totalPageSize = 0L;
        for (MemoryBlock page : this.allocatedPages) {
            totalPageSize += page.size();
        }
        return (this.inMemSorter == null ? 0L : this.inMemSorter.getMemoryUsage()) + totalPageSize;
    }

    private void updatePeakMemoryUsed() {
        long mem = this.getMemoryUsage();
        if (mem > this.peakMemoryUsedBytes) {
            this.peakMemoryUsedBytes = mem;
        }
    }

    public long getPeakMemoryUsedBytes() {
        this.updatePeakMemoryUsed();
        return this.peakMemoryUsedBytes;
    }

    @VisibleForTesting
    public int getNumberOfAllocatedPages() {
        return this.allocatedPages.size();
    }

    private long freeMemory() {
        this.updatePeakMemoryUsed();
        long memoryFreed = 0L;
        for (MemoryBlock block : this.allocatedPages) {
            memoryFreed += block.size();
            this.freePage(block);
        }
        this.allocatedPages.clear();
        this.currentPage = null;
        this.pageCursor = 0L;
        return memoryFreed;
    }

    private void deleteSpillFiles() {
        for (UnsafeSorterSpillWriter spill2 : this.spillWriters) {
            File file = spill2.getFile();
            if (file == null || !file.exists() || file.delete()) continue;
            this.logger.error("Was unable to delete spill file {}", (Object)file.getAbsolutePath());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanupResources() {
        UnsafeExternalSorter unsafeExternalSorter = this;
        synchronized (unsafeExternalSorter) {
            this.deleteSpillFiles();
            this.freeMemory();
            if (this.inMemSorter != null) {
                long used = this.inMemSorter.getMemoryUsage();
                this.inMemSorter = null;
                this.releaseMemory(used);
            }
        }
    }

    private void growPointerArrayIfNecessary() throws IOException {
        assert (this.inMemSorter != null);
        if (!this.inMemSorter.hasSpaceForAnotherRecord()) {
            long used = this.inMemSorter.getMemoryUsage();
            long needed = used + this.inMemSorter.getMemoryToExpand();
            try {
                this.acquireMemory(needed);
            }
            catch (OutOfMemoryError e) {
                assert (this.inMemSorter.hasSpaceForAnotherRecord());
                return;
            }
            if (this.inMemSorter.hasSpaceForAnotherRecord()) {
                this.releaseMemory(needed);
            } else {
                try {
                    this.inMemSorter.expandPointerArray();
                    this.releaseMemory(used);
                }
                catch (OutOfMemoryError oom) {
                    this.releaseMemory(needed);
                    this.spill();
                }
            }
        }
    }

    private void acquireNewPageIfNecessary(int required) {
        if (this.currentPage == null || this.pageCursor + (long)required > this.currentPage.getBaseOffset() + this.currentPage.size()) {
            this.currentPage = this.allocatePage(required);
            this.pageCursor = this.currentPage.getBaseOffset();
            this.allocatedPages.add(this.currentPage);
        }
    }

    public void insertRecord(Object recordBase, long recordOffset, int length, long prefix) throws IOException {
        this.growPointerArrayIfNecessary();
        int required = length + 4;
        this.acquireNewPageIfNecessary(required);
        Object base = this.currentPage.getBaseObject();
        long recordAddress = this.taskMemoryManager.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
        Platform.putInt((Object)base, (long)this.pageCursor, (int)length);
        this.pageCursor += 4L;
        Platform.copyMemory((Object)recordBase, (long)recordOffset, (Object)base, (long)this.pageCursor, (long)length);
        this.pageCursor += (long)length;
        assert (this.inMemSorter != null);
        this.inMemSorter.insertRecord(recordAddress, prefix);
    }

    public void insertKVRecord(Object keyBase, long keyOffset, int keyLen, Object valueBase, long valueOffset, int valueLen, long prefix) throws IOException {
        this.growPointerArrayIfNecessary();
        int required = keyLen + valueLen + 4 + 4;
        this.acquireNewPageIfNecessary(required);
        Object base = this.currentPage.getBaseObject();
        long recordAddress = this.taskMemoryManager.encodePageNumberAndOffset(this.currentPage, this.pageCursor);
        Platform.putInt((Object)base, (long)this.pageCursor, (int)(keyLen + valueLen + 4));
        this.pageCursor += 4L;
        Platform.putInt((Object)base, (long)this.pageCursor, (int)keyLen);
        this.pageCursor += 4L;
        Platform.copyMemory((Object)keyBase, (long)keyOffset, (Object)base, (long)this.pageCursor, (long)keyLen);
        this.pageCursor += (long)keyLen;
        Platform.copyMemory((Object)valueBase, (long)valueOffset, (Object)base, (long)this.pageCursor, (long)valueLen);
        this.pageCursor += (long)valueLen;
        assert (this.inMemSorter != null);
        this.inMemSorter.insertRecord(recordAddress, prefix);
    }

    public UnsafeSorterIterator getSortedIterator() throws IOException {
        assert (this.inMemSorter != null);
        this.readingIterator = new SpillableIterator(this.inMemSorter.getSortedIterator());
        int numIteratorsToMerge = this.spillWriters.size() + (this.readingIterator.hasNext() ? 1 : 0);
        if (this.spillWriters.isEmpty()) {
            return this.readingIterator;
        }
        UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger(this.recordComparator, this.prefixComparator, numIteratorsToMerge);
        for (UnsafeSorterSpillWriter spillWriter : this.spillWriters) {
            spillMerger.addSpillIfNotEmpty(spillWriter.getReader(this.blockManager));
        }
        this.spillWriters.clear();
        spillMerger.addSpillIfNotEmpty(this.readingIterator);
        return spillMerger.getSortedIterator();
    }

    class SpillableIterator
    extends UnsafeSorterIterator {
        private UnsafeSorterIterator upstream;
        private UnsafeSorterIterator nextUpstream = null;
        private MemoryBlock lastPage = null;
        private boolean loaded = false;
        private int numRecords = 0;

        public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) {
            this.upstream = inMemIterator;
            this.numRecords = inMemIterator.numRecordsLeft();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public long spill() throws IOException {
            SpillableIterator spillableIterator = this;
            synchronized (spillableIterator) {
                if (!(this.upstream instanceof UnsafeInMemorySorter.SortedIterator) || this.nextUpstream != null || this.numRecords <= 0) {
                    return 0L;
                }
                UnsafeInMemorySorter.SortedIterator inMemIterator = ((UnsafeInMemorySorter.SortedIterator)this.upstream).clone();
                UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(UnsafeExternalSorter.this.blockManager, UnsafeExternalSorter.this.fileBufferSizeBytes, UnsafeExternalSorter.this.writeMetrics, this.numRecords);
                while (inMemIterator.hasNext()) {
                    inMemIterator.loadNext();
                    Object baseObject = inMemIterator.getBaseObject();
                    long baseOffset = inMemIterator.getBaseOffset();
                    int recordLength = inMemIterator.getRecordLength();
                    spillWriter.write(baseObject, baseOffset, recordLength, inMemIterator.getKeyPrefix());
                }
                spillWriter.close();
                UnsafeExternalSorter.this.spillWriters.add(spillWriter);
                this.nextUpstream = spillWriter.getReader(UnsafeExternalSorter.this.blockManager);
                long released = 0L;
                UnsafeExternalSorter unsafeExternalSorter = UnsafeExternalSorter.this;
                synchronized (unsafeExternalSorter) {
                    for (MemoryBlock page : UnsafeExternalSorter.this.allocatedPages) {
                        if (!this.loaded || page.getBaseObject() != inMemIterator.getBaseObject()) {
                            released += page.size();
                            UnsafeExternalSorter.this.freePage(page);
                            continue;
                        }
                        this.lastPage = page;
                    }
                    UnsafeExternalSorter.this.allocatedPages.clear();
                }
                return released;
            }
        }

        @Override
        public boolean hasNext() {
            return this.numRecords > 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void loadNext() throws IOException {
            SpillableIterator spillableIterator = this;
            synchronized (spillableIterator) {
                this.loaded = true;
                if (this.nextUpstream != null) {
                    if (this.lastPage != null) {
                        UnsafeExternalSorter.this.freePage(this.lastPage);
                        this.lastPage = null;
                    }
                    this.upstream = this.nextUpstream;
                    this.nextUpstream = null;
                    assert (UnsafeExternalSorter.this.inMemSorter != null);
                    long used = UnsafeExternalSorter.this.inMemSorter.getMemoryUsage();
                    UnsafeExternalSorter.this.inMemSorter = null;
                    UnsafeExternalSorter.this.releaseMemory(used);
                }
                --this.numRecords;
                this.upstream.loadNext();
            }
        }

        @Override
        public Object getBaseObject() {
            return this.upstream.getBaseObject();
        }

        @Override
        public long getBaseOffset() {
            return this.upstream.getBaseOffset();
        }

        @Override
        public int getRecordLength() {
            return this.upstream.getRecordLength();
        }

        @Override
        public long getKeyPrefix() {
            return this.upstream.getKeyPrefix();
        }
    }
}

