/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.core.collector;

import io.ray.api.BaseActorHandle;
import io.ray.api.PyActorHandle;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.partition.Partition;
import io.ray.streaming.message.Record;
import io.ray.streaming.runtime.serialization.CrossLangSerializer;
import io.ray.streaming.runtime.serialization.JavaSerializer;
import io.ray.streaming.runtime.serialization.Serializer;
import io.ray.streaming.runtime.transfer.DataWriter;
import io.ray.streaming.runtime.transfer.channel.ChannelId;
import java.nio.ByteBuffer;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutputCollector
implements Collector<Record> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);
    private final DataWriter writer;
    private final ChannelId[] outputQueues;
    private final Collection<BaseActorHandle> targetActors;
    private final Language[] targetLanguages;
    private final Partition partition;
    private final Serializer javaSerializer = new JavaSerializer();
    private final Serializer crossLangSerializer = new CrossLangSerializer();

    public OutputCollector(DataWriter writer, Collection<String> outputChannelIds, Collection<BaseActorHandle> targetActors, Partition partition) {
        this.writer = writer;
        this.outputQueues = (ChannelId[])outputChannelIds.stream().map(ChannelId::from).toArray(ChannelId[]::new);
        this.targetActors = targetActors;
        this.targetLanguages = (Language[])targetActors.stream().map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA).toArray(Language[]::new);
        this.partition = partition;
        LOGGER.debug("OutputCollector constructed, outputChannelIds:{}, partition:{}.", outputChannelIds, (Object)this.partition);
    }

    public void collect(Record record) {
        int[] partitions = this.partition.partition((Object)record, this.outputQueues.length);
        ByteBuffer javaBuffer = null;
        ByteBuffer crossLangBuffer = null;
        for (int partition : partitions) {
            byte[] bytes;
            if (this.targetLanguages[partition] == Language.JAVA) {
                if (javaBuffer == null) {
                    bytes = this.javaSerializer.serialize(record);
                    javaBuffer = ByteBuffer.allocate(1 + bytes.length);
                    javaBuffer.put((byte)1);
                    javaBuffer.put(bytes);
                    javaBuffer.flip();
                }
                this.writer.write(this.outputQueues[partition], javaBuffer.duplicate());
                continue;
            }
            if (crossLangBuffer == null) {
                bytes = this.crossLangSerializer.serialize(record);
                crossLangBuffer = ByteBuffer.allocate(1 + bytes.length);
                crossLangBuffer.put((byte)0);
                crossLangBuffer.put(bytes);
                crossLangBuffer.flip();
            }
            this.writer.write(this.outputQueues[partition], crossLangBuffer.duplicate());
        }
    }
}

