package io.datarouter.filesystem.snapshot.writer;

import io.datarouter.filesystem.snapshot.block.root.RootBlock;
import io.datarouter.filesystem.snapshot.block.root.RootBlockV1;
import io.datarouter.filesystem.snapshot.compress.CompressedBlock;
import io.datarouter.filesystem.snapshot.encode.BranchBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.EncodedBlock;
import io.datarouter.filesystem.snapshot.encode.LeafBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.RootBlockEncoder;
import io.datarouter.filesystem.snapshot.encode.RootBlockFields;
import io.datarouter.filesystem.snapshot.encode.ValueBlockEncoder;
import io.datarouter.filesystem.snapshot.key.SnapshotKey;
import io.datarouter.filesystem.snapshot.path.SnapshotPaths;
import io.datarouter.filesystem.snapshot.storage.block.CacheBlockKey;
import io.datarouter.filesystem.snapshot.storage.block.SnapshotBlockStorage;
import io.datarouter.filesystem.snapshot.storage.file.SnapshotFileStorage;
import io.datarouter.filesystem.snapshot.writer.BlockQueue;
import io.datarouter.scanner.Scanner;
import io.datarouter.util.Count;
import io.datarouter.util.Require;
import io.datarouter.util.concurrent.FutureTool;
import io.datarouter.util.concurrent.ThreadTool;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/filesystem/snapshot/writer/SnapshotBlockWriter.class */
public class SnapshotBlockWriter {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotBlockWriter.class);
    private final SnapshotKey snapshotKey;
    private final SnapshotWriterTracker tracker;
    private final SnapshotBlockStorage blockStorage;
    private final SnapshotFileStorage fileStorage;
    private final SnapshotWriterConfig config;
    public final SnapshotFileWriter fileWriter;
    private final ExecutorService exec;
    private final SnapshotPaths paths;
    private final long maxTasks;
    private final int stallMs = 1;
    private final Queue<LeafBlockEncoder> pendingLeafEncoders = new LinkedBlockingQueue();
    private final Map<Integer, Queue<BranchBlockEncoder>> pendingBranchEncodersByLevel = new ConcurrentHashMap();
    private final Map<Integer, Map<Integer, Future<?>>> branchFutureByBlockIdByLevel = new ConcurrentHashMap();
    private final Map<Integer, Future<?>> leafFutureByBlockId = new ConcurrentHashMap();
    private final Map<Integer, Map<Integer, Future<?>>> valueFutureByBlockIdByColumn = new ConcurrentHashMap();

    public SnapshotBlockWriter(SnapshotKey snapshotKey, SnapshotWriterTracker snapshotWriterTracker, SnapshotBlockStorage snapshotBlockStorage, SnapshotFileStorage snapshotFileStorage, SnapshotWriterConfig snapshotWriterConfig, ExecutorService executorService) {
        this.snapshotKey = snapshotKey;
        this.tracker = snapshotWriterTracker;
        this.blockStorage = snapshotBlockStorage;
        this.fileStorage = snapshotFileStorage;
        this.config = snapshotWriterConfig;
        this.fileWriter = new SnapshotFileWriter(snapshotWriterTracker, snapshotFileStorage, snapshotWriterConfig, this::onValueFileWrite, this::onLeafFileWrite, (v1) -> {
            onBranchFileWrite(v1);
        });
        this.exec = executorService;
        this.paths = snapshotWriterConfig.pathsSupplier.get();
        this.maxTasks = snapshotWriterConfig.numThreads * 100;
    }

    public RootBlock flushRootBlock(long j, List<Integer> list, List<Integer> list2, int i, long j2, int i2) {
        int[] array = list.stream().mapToInt((v0) -> {
            return v0.intValue();
        }).toArray();
        int[] array2 = list2.stream().mapToInt((v0) -> {
            return v0.intValue();
        }).toArray();
        long currentTimeMillis = System.currentTimeMillis() - j;
        RootBlockEncoder rootBlockEncoder = this.config.rootBlockEncoderSupplier.get();
        rootBlockEncoder.set(new RootBlockFields(this.config.sorted, this.config.pathsSupplier.get(), new RootBlockFields.RootBlockEncoderFormats(this.config.branchBlockEncoderFactory.apply(0).format(), this.config.leafBlockEncoderSupplier.get().format(), this.config.valueBlockEncoderSupplier.get().format()), new RootBlockFields.RootBlockEncoderCompressors(this.config.branchBlockCompressor.name(), this.config.leafBlockCompressor.name(), this.config.valueBlockCompressor.name()), new RootBlockFields.RootBlockEncoderBytesPerFile(this.config.branchBytesPerFile, this.config.leafBytesPerFile, this.config.valueBytesPerFile), new RootBlockFields.RootBlockEncoderBlocksPerFile(this.config.branchBlocksPerFile, this.config.leafBlocksPerFile, this.config.valueBlocksPerFile), j2, i, new RootBlockFields.RootBlockEncoderBlockCounts(array, i2, array2), new RootBlockFields.RootBlockEncoderByteCountsEncoded(this.tracker.branchBytesEncoded.value(), this.tracker.leafBytesEncoded.value(), this.tracker.valueBytesEncoded.value()), new RootBlockFields.RootBlockEncoderByteCountsCompressed(this.tracker.branchBytesCompressed.value(), this.tracker.leafBytesCompressed.value(), this.tracker.valueBytesCompressed.value()), new RootBlockFields.RootBlockEncoderBlockEndings(this.fileWriter.rootBranchEnding()), new RootBlockFields.RootBlockEncoderTimings(j, currentTimeMillis)));
        EncodedBlock encode = rootBlockEncoder.encode();
        if (this.config.persist) {
            this.fileStorage.addRootFile(encode);
        }
        return new RootBlockV1(encode.concat());
    }

    public void submitBranch(BranchBlockEncoder branchBlockEncoder) {
        this.pendingBranchEncodersByLevel.computeIfAbsent(Integer.valueOf(branchBlockEncoder.level()), num -> {
            return new LinkedBlockingQueue();
        }).add(branchBlockEncoder);
    }

    private boolean isBranchFlushable(BranchBlockEncoder branchBlockEncoder) {
        boolean z = branchBlockEncoder.firstChildBlockId() > 0;
        int firstChildBlockId = z ? branchBlockEncoder.firstChildBlockId() - 1 : branchBlockEncoder.firstChildBlockId();
        int numRecords = z ? 1 + branchBlockEncoder.numRecords() : branchBlockEncoder.numRecords();
        if (branchBlockEncoder.level() == 0) {
            return this.fileWriter.leafFileInfoReady(firstChildBlockId, numRecords);
        }
        return this.fileWriter.branchFileInfoReady(branchBlockEncoder.level() - 1, firstChildBlockId, numRecords);
    }

    private void flushBranch(BranchBlockEncoder branchBlockEncoder) {
        this.tracker.branchTasks.increment();
        Map<Integer, Future<?>> computeIfAbsent = this.branchFutureByBlockIdByLevel.computeIfAbsent(Integer.valueOf(branchBlockEncoder.level()), (v1) -> {
            return new ConcurrentHashMap(v1);
        });
        computeIfAbsent.put(Integer.valueOf(branchBlockEncoder.blockId()), this.exec.submit(() -> {
            BlockQueue.FileIdsAndEndings branchFileInfo;
            int firstChildBlockId = branchBlockEncoder.firstChildBlockId();
            int numRecords = branchBlockEncoder.numRecords() + 1;
            if (branchBlockEncoder.level() == 0) {
                branchFileInfo = this.fileWriter.leafFileInfo(firstChildBlockId - 1, numRecords);
            } else {
                branchFileInfo = this.fileWriter.branchFileInfo(branchBlockEncoder.level() - 1, firstChildBlockId - 1, numRecords);
            }
            EncodedBlock encode = branchBlockEncoder.encode(branchFileInfo);
            CompressedBlock compress = this.config.branchBlockCompressor.compress(encode, this.config.compressorConcatChunks);
            this.tracker.branchBlock(encode, compress);
            this.fileWriter.addBranchBlock(branchBlockEncoder.level(), branchBlockEncoder.blockId(), compress);
            if (this.blockStorage != null && this.config.updateCache) {
                this.blockStorage.addBranchBlock(this.paths, CacheBlockKey.branch(this.snapshotKey, branchBlockEncoder.level(), branchBlockEncoder.blockId()), compress);
            }
            computeIfAbsent.remove(Integer.valueOf(branchBlockEncoder.blockId()));
            this.tracker.branchTasks.decrement();
        }));
    }

    public void submitLeaf(LeafBlockEncoder leafBlockEncoder) {
        if (this.config.numColumns == 0) {
            flushLeaf(leafBlockEncoder);
        } else {
            this.pendingLeafEncoders.add(leafBlockEncoder);
        }
    }

    private boolean isLeafFlushable(LeafBlockEncoder leafBlockEncoder) {
        for (int i = 0; i < this.config.numColumns; i++) {
            if (!this.fileWriter.valueFileInfoReady(i, leafBlockEncoder.firstValueBlockId(i), leafBlockEncoder.numValueBlocks(i))) {
                return false;
            }
        }
        return true;
    }

    private void flushLeaf(LeafBlockEncoder leafBlockEncoder) {
        leafBackpressure();
        this.tracker.leafTasks.increment();
        this.leafFutureByBlockId.put(Integer.valueOf(leafBlockEncoder.blockId()), this.exec.submit(() -> {
            if (this.config.sorted) {
                leafBlockEncoder.assertKeysSorted();
            }
            BlockQueue.FileIdsAndEndings[] fileIdsAndEndingsArr = new BlockQueue.FileIdsAndEndings[this.config.numColumns];
            for (int i = 0; i < this.config.numColumns; i++) {
                fileIdsAndEndingsArr[i] = this.fileWriter.valueFileInfo(i, leafBlockEncoder.firstValueBlockId(i) - 1, leafBlockEncoder.numValueBlocks(i) + 1);
            }
            EncodedBlock encode = leafBlockEncoder.encode(fileIdsAndEndingsArr);
            CompressedBlock compress = this.config.leafBlockCompressor.compress(encode, this.config.compressorConcatChunks);
            this.tracker.leafBlock(encode, compress);
            this.fileWriter.addLeafBlock(leafBlockEncoder.blockId(), compress);
            if (this.blockStorage != null && this.config.updateCache) {
                this.blockStorage.addLeafBlock(this.paths, CacheBlockKey.leaf(this.snapshotKey, leafBlockEncoder.blockId()), compress);
            }
            this.leafFutureByBlockId.remove(Integer.valueOf(leafBlockEncoder.blockId()));
            this.tracker.leafTasks.decrement();
        }));
    }

    private void leafBackpressure() {
        if (this.config.numColumns > 0) {
            return;
        }
        long nanoTime = System.nanoTime();
        while (this.tracker.leafTasks.value() >= this.maxTasks) {
            ThreadTool.trySleep(this.stallMs);
        }
        this.tracker.leafStallNs.incrementBy(System.nanoTime() - nanoTime);
    }

    public void submitValueBlock(int i, int i2, ValueBlockEncoder valueBlockEncoder) {
        flushValueBlock(i, i2, valueBlockEncoder);
    }

    private void flushValueBlock(int i, int i2, ValueBlockEncoder valueBlockEncoder) {
        valueBackpressure();
        this.tracker.valueTasks.increment();
        Map<Integer, Future<?>> computeIfAbsent = this.valueFutureByBlockIdByColumn.computeIfAbsent(Integer.valueOf(i), (v1) -> {
            return new ConcurrentHashMap(v1);
        });
        computeIfAbsent.put(Integer.valueOf(i2), this.exec.submit(() -> {
            EncodedBlock encode = valueBlockEncoder.encode();
            CompressedBlock compress = this.config.valueBlockCompressor.compress(encode, this.config.compressorConcatChunks);
            this.tracker.valueBlock(encode, compress);
            this.fileWriter.addValueBlock(i, i2, compress);
            if (this.blockStorage != null && this.config.updateCache) {
                this.blockStorage.addValueBlock(this.paths, CacheBlockKey.value(this.snapshotKey, i, i2), compress);
            }
            computeIfAbsent.remove(Integer.valueOf(i2));
            this.tracker.valueTasks.decrement();
        }));
    }

    private void valueBackpressure() {
        long nanoTime = System.nanoTime();
        while (this.tracker.valueTasks.value() >= this.maxTasks) {
            ThreadTool.trySleep(this.stallMs);
        }
        this.tracker.valueStallNs.incrementBy(System.nanoTime() - nanoTime);
    }

    public synchronized void onValueFileWrite(Void r4) {
        Scanner.of(this.pendingLeafEncoders).include(this::isLeafFlushable).each(leafBlockEncoder -> {
            this.pendingLeafEncoders.remove();
        }).forEach(this::flushLeaf);
    }

    public void onLeafFileWrite(Void r4) {
        tryFlushBranches(0);
    }

    public void onBranchFileWrite(int i) {
        tryFlushBranches(i + 1);
    }

    private synchronized void tryFlushBranches(int i) {
        Queue<BranchBlockEncoder> queue = this.pendingBranchEncodersByLevel.get(Integer.valueOf(i));
        if (queue == null) {
            return;
        }
        Scanner.of(queue).include(this::isBranchFlushable).each(branchBlockEncoder -> {
            queue.remove();
        }).forEach(this::flushBranch);
    }

    public void complete() {
        Scanner.of(this.valueFutureByBlockIdByColumn.keySet()).sort().forEach(num -> {
            drainFutures(this.valueFutureByBlockIdByColumn.get(num).values(), "value column " + num);
        });
        this.fileWriter.completeValues();
        Scanner sort = Scanner.of(this.pendingLeafEncoders).each(leafBlockEncoder -> {
            Require.isTrue(isLeafFlushable(leafBlockEncoder));
        }).sort(LeafBlockEncoder.BLOCK_ID_COMPARATOR);
        Queue<LeafBlockEncoder> queue = this.pendingLeafEncoders;
        queue.getClass();
        sort.each((v1) -> {
            r1.remove(v1);
        }).forEach(this::flushLeaf);
        drainFutures(this.leafFutureByBlockId.values(), "leaf");
        this.fileWriter.completeLeaves();
        Scanner.of(this.pendingBranchEncodersByLevel.keySet()).sort().forEach(num2 -> {
            Scanner.of(this.pendingBranchEncodersByLevel.get(num2)).exclude((v0) -> {
                return v0.isEmpty();
            }).each(branchBlockEncoder -> {
                Require.isTrue(isBranchFlushable(branchBlockEncoder));
            }).forEach(this::flushBranch);
            drainFutures(this.branchFutureByBlockIdByLevel.get(num2).values(), "branch level " + num2);
            this.fileWriter.completeBranches(num2.intValue());
        });
        this.fileWriter.logQueueStats();
    }

    private void drainFutures(Iterable<Future<?>> iterable, String str) {
        Count.Counts counts = new Count.Counts();
        Count add = counts.add("pending");
        Count add2 = counts.add("done");
        Count add3 = counts.add("canceled");
        Count add4 = counts.add("waited");
        Scanner.of(iterable).each(future -> {
            add.increment();
            if (future.isDone()) {
                add2.increment();
            }
            if (future.isCancelled()) {
                add3.increment();
            }
            if (future.isDone() || future.isCancelled()) {
                return;
            }
            add4.increment();
        }).each(FutureTool::get).count();
        logger.info("drained {} {}", str, counts);
    }
}
