/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.transfer;

import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.transfer.ChannelCreationParametersBuilder;
import io.ray.streaming.runtime.transfer.channel.ChannelId;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.transfer.channel.ChannelUtils;
import io.ray.streaming.runtime.transfer.channel.OffsetInfo;
import io.ray.streaming.runtime.transfer.message.BarrierMessage;
import io.ray.streaming.runtime.transfer.message.ChannelMessage;
import io.ray.streaming.runtime.transfer.message.DataMessage;
import io.ray.streaming.runtime.util.Platform;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(DataReader.class);
    private long nativeReaderPtr;
    private final ByteBuffer getBundleParams = ByteBuffer.allocateDirect(24);
    private final ByteBuffer bundleData = Platform.wrapDirectBuffer(0L, 0);
    private final ByteBuffer bundleMeta = ByteBuffer.allocateDirect(60);
    private final Map<String, ChannelRecoverInfo.ChannelCreationStatus> queueCreationStatusMap = new HashMap<String, ChannelRecoverInfo.ChannelCreationStatus>();
    private Queue<ChannelMessage> buf = new LinkedList<ChannelMessage>();

    public DataReader(List<String> inputChannels, List<BaseActorHandle> fromActors, Map<String, OffsetInfo> checkpoints, StreamingWorkerConfig workerConfig) {
        this.getBundleParams.order(ByteOrder.nativeOrder());
        this.bundleData.order(ByteOrder.nativeOrder());
        this.bundleMeta.order(ByteOrder.nativeOrder());
        Preconditions.checkArgument((inputChannels.size() > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((inputChannels.size() == fromActors.size() ? 1 : 0) != 0);
        ChannelCreationParametersBuilder initialParameters = new ChannelCreationParametersBuilder().buildInputQueueParameters(inputChannels, fromActors);
        byte[][] inputChannelsBytes = (byte[][])inputChannels.stream().map(ChannelId::idStrToBytes).toArray(x$0 -> new byte[x$0][]);
        long[] msgIds = new long[inputChannels.size()];
        for (int i = 0; i < inputChannels.size(); ++i) {
            String channelId = inputChannels.get(i);
            msgIds[i] = !checkpoints.containsKey(channelId) ? 0L : checkpoints.get(inputChannels.get(i)).getStreamingMsgId();
        }
        long timerInterval = workerConfig.transferConfig.readerTimerIntervalMs();
        TransferChannelType channelType = workerConfig.transferConfig.channelType();
        boolean isMock = false;
        if (TransferChannelType.MEMORY_CHANNEL == channelType) {
            isMock = true;
        }
        ArrayList<Integer> creationStatus = new ArrayList<Integer>();
        this.nativeReaderPtr = DataReader.createDataReaderNative(initialParameters, inputChannelsBytes, msgIds, timerInterval, creationStatus, ChannelUtils.toNativeConf(workerConfig), isMock);
        for (int i = 0; i < inputChannels.size(); ++i) {
            this.queueCreationStatusMap.put(inputChannels.get(i), ChannelRecoverInfo.ChannelCreationStatus.fromInt((Integer)creationStatus.get(i)));
        }
        LOG.info("Create DataReader succeed for worker: {}, creation status={}.", (Object)workerConfig.workerInternalConfig.workerName(), this.queueCreationStatusMap);
    }

    private static native long createDataReaderNative(ChannelCreationParametersBuilder var0, byte[][] var1, long[] var2, long var3, List<Integer> var5, byte[] var6, boolean var7);

    public ChannelMessage read(long timeoutMillis) {
        if (this.buf.isEmpty()) {
            this.getBundle(timeoutMillis);
            if (this.bundleData.position() < this.bundleData.limit()) {
                BundleMeta bundleMeta = new BundleMeta(this.bundleMeta);
                String channelID = bundleMeta.getChannelID();
                long timestamp = bundleMeta.getBundleTs();
                if (bundleMeta.getBundleType() == DataBundleType.BARRIER) {
                    this.buf.offer(this.getBarrier(this.bundleData, channelID, timestamp));
                } else if (bundleMeta.getBundleType() == DataBundleType.BUNDLE) {
                    for (int i = 0; i < bundleMeta.getMessageListSize(); ++i) {
                        this.buf.offer(this.getDataMessage(this.bundleData, channelID, timestamp));
                    }
                }
            }
        }
        if (this.buf.isEmpty()) {
            return null;
        }
        return this.buf.poll();
    }

    public ChannelRecoverInfo getQueueRecoverInfo() {
        return new ChannelRecoverInfo(this.queueCreationStatusMap);
    }

    private String getQueueIdString(ByteBuffer buffer) {
        byte[] bytes = new byte[28];
        buffer.get(bytes);
        return ChannelId.idBytesToStr(bytes);
    }

    private BarrierMessage getBarrier(ByteBuffer bundleData, String channelID, long timestamp) {
        ByteBuffer offsetsInfoBytes = ByteBuffer.wrap(this.getOffsetsInfoNative(this.nativeReaderPtr));
        offsetsInfoBytes.order(ByteOrder.nativeOrder());
        BarrierOffsetInfo offsetInfo = new BarrierOffsetInfo(offsetsInfoBytes);
        DataMessage message = this.getDataMessage(bundleData, channelID, timestamp);
        BarrierItem barrierItem = new BarrierItem(message, offsetInfo);
        return new BarrierMessage(message.getMsgId(), message.getTimestamp(), message.getChannelId(), barrierItem.getData(), barrierItem.getGlobalBarrierId(), barrierItem.getBarrierOffsetInfo().getQueueOffsetInfo());
    }

    private DataMessage getDataMessage(ByteBuffer bundleData, String channelID, long timestamp) {
        int dataSize = bundleData.getInt();
        long msgId = bundleData.getLong();
        bundleData.getInt();
        int position = bundleData.position();
        int limit = bundleData.limit();
        bundleData.limit(position + dataSize);
        ByteBuffer data = bundleData.slice();
        bundleData.limit(limit);
        bundleData.position(position + dataSize);
        return new DataMessage(data, timestamp, msgId, channelID);
    }

    private void getBundle(long timeoutMillis) {
        this.getBundleNative(this.nativeReaderPtr, timeoutMillis, Platform.getAddress(this.getBundleParams), Platform.getAddress(this.bundleMeta));
        this.bundleMeta.rewind();
        long bundleAddress = this.getBundleParams.getLong(0);
        int bundleSize = this.getBundleParams.getInt(8);
        Platform.wrapDirectBuffer(this.bundleData, bundleAddress, bundleSize);
    }

    public void stop() {
        this.stopReaderNative(this.nativeReaderPtr);
    }

    public void close() {
        if (this.nativeReaderPtr == 0L) {
            return;
        }
        LOG.info("Closing DataReader.");
        this.closeReaderNative(this.nativeReaderPtr);
        this.nativeReaderPtr = 0L;
        LOG.info("Finish closing DataReader.");
    }

    private native void getBundleNative(long var1, long var3, long var5, long var7);

    private native byte[] getOffsetsInfoNative(long var1);

    private native void stopReaderNative(long var1);

    private native void closeReaderNative(long var1);

    class BarrierItem {
        BarrierOffsetInfo barrierOffsetInfo;
        private long msgId;
        private BarrierType barrierType;
        private long globalBarrierId;
        private ByteBuffer data;

        public BarrierItem(DataMessage message, BarrierOffsetInfo barrierOffsetInfo) {
            this.barrierOffsetInfo = barrierOffsetInfo;
            this.msgId = message.getMsgId();
            ByteBuffer buffer = message.body();
            buffer.order(ByteOrder.nativeOrder());
            int barrierTypeInt = buffer.getInt();
            this.globalBarrierId = buffer.getLong();
            this.data = buffer.slice();
            this.data.order(ByteOrder.nativeOrder());
            buffer.position(buffer.limit());
            this.barrierType = BarrierType.GLOBAL_BARRIER;
        }

        public long getBarrierMsgId() {
            return this.msgId;
        }

        public BarrierType getBarrierType() {
            return this.barrierType;
        }

        public long getGlobalBarrierId() {
            return this.globalBarrierId;
        }

        public ByteBuffer getData() {
            return this.data;
        }

        public BarrierOffsetInfo getBarrierOffsetInfo() {
            return this.barrierOffsetInfo;
        }
    }

    class BarrierOffsetInfo {
        private int queueSize;
        private Map<String, OffsetInfo> queueOffsetInfo;

        public BarrierOffsetInfo(ByteBuffer buffer) {
            this.queueSize = buffer.getInt();
            this.queueOffsetInfo = new HashMap<String, OffsetInfo>(this.queueSize);
            for (int i = 0; i < this.queueSize; ++i) {
                String qid = DataReader.this.getQueueIdString(buffer);
                long streamingMsgId = buffer.getLong();
                this.queueOffsetInfo.put(qid, new OffsetInfo(streamingMsgId));
            }
        }

        public int getQueueSize() {
            return this.queueSize;
        }

        public Map<String, OffsetInfo> getQueueOffsetInfo() {
            return this.queueOffsetInfo;
        }
    }

    class BundleMeta {
        static final int LENGTH = 60;
        private int magicNum;
        private long bundleTs;
        private long lastMessageId;
        private int messageListSize;
        private DataBundleType bundleType;
        private String channelID;
        private int rawBundleSize;

        BundleMeta(ByteBuffer buffer) {
            this.magicNum = buffer.getInt();
            this.bundleTs = buffer.getLong();
            this.lastMessageId = buffer.getLong();
            this.messageListSize = buffer.getInt();
            int typeInt = buffer.getInt();
            this.bundleType = DataBundleType.BUNDLE.code == typeInt ? DataBundleType.BUNDLE : (DataBundleType.BARRIER.code == typeInt ? DataBundleType.BARRIER : DataBundleType.EMPTY);
            this.rawBundleSize = buffer.getInt();
            this.channelID = DataReader.this.getQueueIdString(buffer);
        }

        public int getMagicNum() {
            return this.magicNum;
        }

        public long getBundleTs() {
            return this.bundleTs;
        }

        public long getLastMessageId() {
            return this.lastMessageId;
        }

        public int getMessageListSize() {
            return this.messageListSize;
        }

        public DataBundleType getBundleType() {
            return this.bundleType;
        }

        public String getChannelID() {
            return this.channelID;
        }

        public int getRawBundleSize() {
            return this.rawBundleSize;
        }
    }

    public static enum BarrierType {
        GLOBAL_BARRIER(0);

        private int code;

        private BarrierType(int code) {
            this.code = code;
        }
    }

    static enum DataBundleType {
        EMPTY(1),
        BARRIER(2),
        BUNDLE(3);

        int code;

        private DataBundleType(int code) {
            this.code = code;
        }
    }
}

