/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.transfer;

import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.transfer.ChannelCreationParametersBuilder;
import io.ray.streaming.runtime.transfer.channel.ChannelId;
import io.ray.streaming.runtime.transfer.channel.ChannelUtils;
import io.ray.streaming.runtime.transfer.channel.OffsetInfo;
import io.ray.streaming.runtime.util.Platform;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(DataWriter.class);
    private long nativeWriterPtr;
    private ByteBuffer buffer = ByteBuffer.allocateDirect(0);
    private long bufferAddress;
    private List<String> outputChannels;

    public DataWriter(List<String> outputChannels, List<BaseActorHandle> toActors, Map<String, OffsetInfo> checkpoints, StreamingWorkerConfig workerConfig) {
        this.ensureBuffer(0);
        Preconditions.checkArgument((!outputChannels.isEmpty() ? 1 : 0) != 0);
        Preconditions.checkArgument((outputChannels.size() == toActors.size() ? 1 : 0) != 0);
        this.outputChannels = outputChannels;
        ChannelCreationParametersBuilder initialParameters = new ChannelCreationParametersBuilder().buildOutputQueueParameters(outputChannels, toActors);
        byte[][] outputChannelsBytes = (byte[][])outputChannels.stream().map(ChannelId::idStrToBytes).toArray(x$0 -> new byte[x$0][]);
        long channelSize = workerConfig.transferConfig.channelSize();
        long[] msgIds = new long[outputChannels.size()];
        for (int i = 0; i < outputChannels.size(); ++i) {
            String channelId = outputChannels.get(i);
            msgIds[i] = !checkpoints.containsKey(channelId) ? 0L : checkpoints.get(channelId).getStreamingMsgId();
        }
        TransferChannelType channelType = workerConfig.transferConfig.channelType();
        boolean isMock = false;
        if (TransferChannelType.MEMORY_CHANNEL == channelType) {
            isMock = true;
        }
        this.nativeWriterPtr = DataWriter.createWriterNative(initialParameters, outputChannelsBytes, msgIds, channelSize, ChannelUtils.toNativeConf(workerConfig), isMock);
        LOG.info("Create DataWriter succeed for worker: {}.", (Object)workerConfig.workerInternalConfig.workerName());
    }

    private static native long createWriterNative(ChannelCreationParametersBuilder var0, byte[][] var1, long[] var2, long var3, byte[] var5, boolean var6);

    public void write(ChannelId id, ByteBuffer item) {
        int size = item.remaining();
        this.ensureBuffer(size);
        this.buffer.clear();
        this.buffer.put(item);
        this.writeMessageNative(this.nativeWriterPtr, id.getNativeIdPtr(), this.bufferAddress, size);
    }

    public void write(Set<ChannelId> ids, ByteBuffer item) {
        int size = item.remaining();
        this.ensureBuffer(size);
        for (ChannelId id : ids) {
            this.buffer.clear();
            this.buffer.put(item.duplicate());
            this.writeMessageNative(this.nativeWriterPtr, id.getNativeIdPtr(), this.bufferAddress, size);
        }
    }

    private void ensureBuffer(int size) {
        if (this.buffer.capacity() < size) {
            this.buffer = ByteBuffer.allocateDirect(size);
            this.buffer.order(ByteOrder.nativeOrder());
            this.bufferAddress = Platform.getAddress(this.buffer);
        }
    }

    public Map<String, OffsetInfo> getOutputCheckpoints() {
        long[] msgId = this.getOutputMsgIdNative(this.nativeWriterPtr);
        HashMap<String, OffsetInfo> res = new HashMap<String, OffsetInfo>(this.outputChannels.size());
        for (int i = 0; i < this.outputChannels.size(); ++i) {
            res.put(this.outputChannels.get(i), new OffsetInfo(msgId[i]));
        }
        LOG.info("got output points, {}.", res);
        return res;
    }

    public void broadcastBarrier(long checkpointId, ByteBuffer attach) {
        LOG.info("Broadcast barrier, cpId={}.", (Object)checkpointId);
        Preconditions.checkArgument((attach.order() == ByteOrder.nativeOrder() ? 1 : 0) != 0);
        this.broadcastBarrierNative(this.nativeWriterPtr, checkpointId, attach.array());
    }

    public void clearCheckpoint(long checkpointId) {
        LOG.info("Producer clear checkpoint, checkpointId={}.", (Object)checkpointId);
        this.clearCheckpointNative(this.nativeWriterPtr, checkpointId);
    }

    public void stop() {
        this.stopWriterNative(this.nativeWriterPtr);
    }

    public void close() {
        if (this.nativeWriterPtr == 0L) {
            return;
        }
        LOG.info("Closing data writer.");
        this.closeWriterNative(this.nativeWriterPtr);
        this.nativeWriterPtr = 0L;
        LOG.info("Finish closing data writer.");
    }

    private native long writeMessageNative(long var1, long var3, long var5, int var7);

    private native void stopWriterNative(long var1);

    private native void closeWriterNative(long var1);

    private native long[] getOutputMsgIdNative(long var1);

    private native void broadcastBarrierNative(long var1, long var3, byte[] var5);

    private native void clearCheckpointNative(long var1, long var3);
}

