/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.transfer;

import com.google.common.base.Preconditions;
import io.ray.api.BaseActorHandle;
import io.ray.streaming.runtime.config.StreamingWorkerConfig;
import io.ray.streaming.runtime.config.types.TransferChannelType;
import io.ray.streaming.runtime.transfer.ChannelCreationParametersBuilder;
import io.ray.streaming.runtime.transfer.ChannelId;
import io.ray.streaming.runtime.transfer.ChannelUtils;
import io.ray.streaming.runtime.transfer.DataMessage;
import io.ray.streaming.runtime.util.Platform;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(DataReader.class);
    private long nativeReaderPtr;
    private Queue<DataMessage> buf = new LinkedList<DataMessage>();
    private final ByteBuffer getBundleParams = ByteBuffer.allocateDirect(24);
    private final ByteBuffer bundleData = Platform.wrapDirectBuffer(0L, 0);
    private final ByteBuffer bundleMeta = ByteBuffer.allocateDirect(52);

    public DataReader(List<String> inputChannels, List<BaseActorHandle> fromActors, StreamingWorkerConfig workerConfig) {
        this.getBundleParams.order(ByteOrder.nativeOrder());
        this.bundleData.order(ByteOrder.nativeOrder());
        this.bundleMeta.order(ByteOrder.nativeOrder());
        Preconditions.checkArgument((inputChannels.size() > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((inputChannels.size() == fromActors.size() ? 1 : 0) != 0);
        ChannelCreationParametersBuilder initialParameters = new ChannelCreationParametersBuilder().buildInputQueueParameters(inputChannels, fromActors);
        byte[][] inputChannelsBytes = (byte[][])inputChannels.stream().map(ChannelId::idStrToBytes).toArray(x$0 -> new byte[x$0][]);
        long[] seqIds = new long[inputChannels.size()];
        long[] msgIds = new long[inputChannels.size()];
        for (int i = 0; i < inputChannels.size(); ++i) {
            seqIds[i] = 0L;
            msgIds[i] = 0L;
        }
        long timerInterval = workerConfig.transferConfig.readerTimerIntervalMs();
        TransferChannelType channelType = workerConfig.transferConfig.channelType();
        boolean isMock = false;
        if (TransferChannelType.MEMORY_CHANNEL == channelType) {
            isMock = true;
        }
        boolean isRecreate = workerConfig.transferConfig.readerIsRecreate();
        this.nativeReaderPtr = DataReader.createDataReaderNative(initialParameters, inputChannelsBytes, seqIds, msgIds, timerInterval, isRecreate, ChannelUtils.toNativeConf(workerConfig), isMock);
        LOG.info("Create DataReader succeed for worker: {}.", (Object)workerConfig.workerInternalConfig.workerName());
    }

    public DataMessage read(long timeoutMillis) {
        if (this.buf.isEmpty()) {
            this.getBundle(timeoutMillis);
            if (this.bundleData.position() < this.bundleData.limit()) {
                BundleMeta bundleMeta = new BundleMeta(this.bundleMeta);
                if (bundleMeta.getBundleType() == DataBundleType.BARRIER) {
                    throw new UnsupportedOperationException("Unsupported bundle type " + (Object)((Object)bundleMeta.getBundleType()));
                }
                if (bundleMeta.getBundleType() == DataBundleType.BUNDLE) {
                    String channelID = bundleMeta.getChannelID();
                    long timestamp = bundleMeta.getBundleTs();
                    for (int i = 0; i < bundleMeta.getMessageListSize(); ++i) {
                        this.buf.offer(this.getDataMessage(this.bundleData, channelID, timestamp));
                    }
                } else if (bundleMeta.getBundleType() == DataBundleType.EMPTY) {
                    long messageId = bundleMeta.getLastMessageId();
                    this.buf.offer(new DataMessage(null, bundleMeta.getBundleTs(), messageId, bundleMeta.getChannelID()));
                }
            }
        }
        if (this.buf.isEmpty()) {
            return null;
        }
        return this.buf.poll();
    }

    private DataMessage getDataMessage(ByteBuffer bundleData, String channelID, long timestamp) {
        int dataSize = bundleData.getInt();
        long msgId = bundleData.getLong();
        bundleData.getInt();
        int position = bundleData.position();
        int limit = bundleData.limit();
        bundleData.limit(position + dataSize);
        ByteBuffer data = bundleData.slice();
        bundleData.limit(limit);
        bundleData.position(position + dataSize);
        return new DataMessage(data, timestamp, msgId, channelID);
    }

    private void getBundle(long timeoutMillis) {
        this.getBundleNative(this.nativeReaderPtr, timeoutMillis, Platform.getAddress(this.getBundleParams), Platform.getAddress(this.bundleMeta));
        this.bundleMeta.rewind();
        long bundleAddress = this.getBundleParams.getLong(0);
        int bundleSize = this.getBundleParams.getInt(8);
        Platform.wrapDirectBuffer(this.bundleData, bundleAddress, bundleSize);
    }

    public void stop() {
        this.stopReaderNative(this.nativeReaderPtr);
    }

    public void close() {
        if (this.nativeReaderPtr == 0L) {
            return;
        }
        LOG.info("Closing DataReader.");
        this.closeReaderNative(this.nativeReaderPtr);
        this.nativeReaderPtr = 0L;
        LOG.info("Finish closing DataReader.");
    }

    private static native long createDataReaderNative(ChannelCreationParametersBuilder var0, byte[][] var1, long[] var2, long[] var3, long var4, boolean var6, byte[] var7, boolean var8);

    private native void getBundleNative(long var1, long var3, long var5, long var7);

    private native void stopReaderNative(long var1);

    private native void closeReaderNative(long var1);

    static class BundleMeta {
        static final int LENGTH = 52;
        private int magicNum;
        private long bundleTs;
        private long lastMessageId;
        private int messageListSize;
        private DataBundleType bundleType;
        private String channelID;
        private int rawBundleSize;

        BundleMeta(ByteBuffer buffer) {
            this.magicNum = buffer.getInt();
            this.bundleTs = buffer.getLong();
            this.lastMessageId = buffer.getLong();
            this.messageListSize = buffer.getInt();
            int typeInt = buffer.getInt();
            this.bundleType = DataBundleType.BUNDLE.code == typeInt ? DataBundleType.BUNDLE : (DataBundleType.BARRIER.code == typeInt ? DataBundleType.BARRIER : DataBundleType.EMPTY);
            this.rawBundleSize = buffer.getInt();
            this.channelID = this.getQidString(buffer);
        }

        private String getQidString(ByteBuffer buffer) {
            byte[] bytes = new byte[20];
            buffer.get(bytes);
            return ChannelId.idBytesToStr(bytes);
        }

        public int getMagicNum() {
            return this.magicNum;
        }

        public long getBundleTs() {
            return this.bundleTs;
        }

        public long getLastMessageId() {
            return this.lastMessageId;
        }

        public int getMessageListSize() {
            return this.messageListSize;
        }

        public DataBundleType getBundleType() {
            return this.bundleType;
        }

        public String getChannelID() {
            return this.channelID;
        }

        public int getRawBundleSize() {
            return this.rawBundleSize;
        }
    }

    static enum DataBundleType {
        EMPTY(1),
        BARRIER(2),
        BUNDLE(3);

        int code;

        private DataBundleType(int code) {
            this.code = code;
        }
    }
}

