/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.gds.core.write;

import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongUnaryOperator;
import java.util.stream.Stream;
import org.neo4j.gds.api.IdMap;
import org.neo4j.gds.core.concurrency.Pools;
import org.neo4j.gds.core.utils.TerminationFlag;
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
import org.neo4j.gds.core.write.NativeRelationshipStreamExporterBuilder;
import org.neo4j.gds.core.write.Relationship;
import org.neo4j.gds.core.write.RelationshipStreamExporter;
import org.neo4j.gds.core.write.RelationshipStreamExporterBuilder;
import org.neo4j.gds.transaction.TransactionContext;
import org.neo4j.gds.utils.StatementApi;
import org.neo4j.internal.kernel.api.Write;
import org.neo4j.values.storable.Value;

public final class NativeRelationshipStreamExporter
extends StatementApi
implements RelationshipStreamExporter {
    private static final int QUEUE_CAPACITY = 2;
    private final LongUnaryOperator toOriginalId;
    private final Stream<Relationship> relationships;
    private final int batchSize;
    private final TerminationFlag terminationFlag;
    private final ProgressTracker progressTracker;

    public static RelationshipStreamExporterBuilder<NativeRelationshipStreamExporter> builder(TransactionContext transactionContext, IdMap idMap, Stream<Relationship> relationships, TerminationFlag terminationFlag) {
        return new NativeRelationshipStreamExporterBuilder(transactionContext).withRelationships(relationships).withIdMappingOperator(idMap::toOriginalNodeId).withTerminationFlag(terminationFlag);
    }

    NativeRelationshipStreamExporter(TransactionContext tx, LongUnaryOperator toOriginalId, Stream<Relationship> relationships, int batchSize, TerminationFlag terminationFlag, ProgressTracker progressTracker) {
        super(tx);
        this.toOriginalId = toOriginalId;
        this.relationships = (Stream)relationships.sequential();
        this.batchSize = batchSize;
        this.terminationFlag = terminationFlag;
        this.progressTracker = progressTracker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long write(String relationshipType, String ... propertyKeys) {
        this.progressTracker.beginSubTask();
        try {
            int relationshipToken = this.getOrCreateRelationshipToken(relationshipType);
            int[] propertyTokens = Arrays.stream(propertyKeys).mapToInt(this::getOrCreatePropertyToken).toArray();
            LinkedBlockingQueue<Buffer> writeQueue = new LinkedBlockingQueue<Buffer>(2);
            LinkedBlockingQueue<Buffer> bufferPool = new LinkedBlockingQueue<Buffer>(2);
            for (int i = 0; i < 2; ++i) {
                bufferPool.add(new Buffer(this.batchSize));
            }
            Writer writer = new Writer(this.tx, this.progressTracker, this.toOriginalId, writeQueue, bufferPool, relationshipToken, propertyTokens, this.terminationFlag);
            Future<?> consumer = Pools.DEFAULT.submit(writer);
            AtomicReference<Buffer> bufferRef = new AtomicReference<Buffer>(bufferPool.poll());
            this.relationships.forEach(relationship -> {
                Buffer buffer = (Buffer)bufferRef.get();
                buffer.add((Relationship)relationship);
                if (buffer.isFull()) {
                    try {
                        writeQueue.put(buffer);
                        bufferRef.set((Buffer)bufferPool.take());
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            });
            try {
                writeQueue.put(bufferRef.get());
                writeQueue.put(new Buffer(0));
                consumer.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
            long l = writer.written;
            return l;
        }
        finally {
            this.progressTracker.endSubTask();
        }
    }

    static class Buffer {
        private final long capacity;
        private final Relationship[] relationships;
        private int size;

        Buffer(int capacity) {
            this.relationships = new Relationship[capacity];
            this.capacity = capacity;
        }

        void add(Relationship relationship) {
            this.relationships[this.size] = relationship;
            ++this.size;
        }

        boolean isFull() {
            return (long)this.size == this.capacity;
        }

        void reset() {
            this.size = 0;
        }
    }

    static class Writer
    extends StatementApi
    implements Runnable {
        private final TerminationFlag terminationFlag;
        private final ProgressTracker progressTracker;
        private final LongUnaryOperator toOriginalId;
        private final BlockingQueue<Buffer> writeQueue;
        private final BlockingQueue<Buffer> bufferPool;
        private final int relationshipToken;
        private final int[] propertyTokens;
        private long written;

        Writer(TransactionContext tx, ProgressTracker progressTracker, LongUnaryOperator toOriginalId, BlockingQueue<Buffer> writeQueue, BlockingQueue<Buffer> bufferPool, int relationshipToken, int[] propertyTokens, TerminationFlag terminationFlag) {
            super(tx);
            this.progressTracker = progressTracker;
            this.toOriginalId = toOriginalId;
            this.writeQueue = writeQueue;
            this.bufferPool = bufferPool;
            this.relationshipToken = relationshipToken;
            this.propertyTokens = propertyTokens;
            this.terminationFlag = terminationFlag;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Buffer buffer = this.writeQueue.take();
                    if (buffer.size == 0) {
                        return;
                    }
                    this.written += (long)this.write(buffer, this.relationshipToken, this.propertyTokens);
                    this.progressTracker.logProgress(this.written, "has written %d relationships");
                    buffer.reset();
                    this.bufferPool.put(buffer);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        private int write(Buffer buffer, int relationshipToken, int[] propertyTokens) {
            int bufferSize = buffer.size;
            int tokenCount = propertyTokens.length;
            Relationship[] relationships = buffer.relationships;
            this.acceptInTransaction(stmt -> {
                this.terminationFlag.assertRunning();
                Write ops = stmt.dataWrite();
                for (int i = 0; i < bufferSize; ++i) {
                    long relationshipId = ops.relationshipCreate(this.toOriginalId.applyAsLong(relationships[i].sourceNode()), relationshipToken, this.toOriginalId.applyAsLong(relationships[i].targetNode()));
                    Value[] values = relationships[i].values();
                    for (int j = 0; j < tokenCount; ++j) {
                        ops.relationshipSetProperty(relationshipId, propertyTokens[j], values[j]);
                    }
                }
            });
            return bufferSize;
        }
    }
}

