package io.activej.dataflow.collector;

import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.dataset.Dataset;
import io.activej.dataflow.dataset.SortedDataset;
import io.activej.dataflow.graph.DataflowContext;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.NodeUpload;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamReducer;
import io.activej.datastream.processor.StreamReducers;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

/* loaded from: input_file:io/activej/dataflow/collector/MergeCollector.class */
public class MergeCollector<K, T> {
    private final Dataset<T> input;
    private final DataflowClient client;
    private final Function<T, K> keyFunction;
    private final Comparator<K> keyComparator;
    private final boolean deduplicate;

    public MergeCollector(SortedDataset<K, T> sortedDataset, DataflowClient dataflowClient, boolean z) {
        this(sortedDataset, dataflowClient, sortedDataset.keyFunction(), sortedDataset.keyComparator(), z);
    }

    public MergeCollector(Dataset<T> dataset, DataflowClient dataflowClient, Function<T, K> function, Comparator<K> comparator, boolean z) {
        this.input = dataset;
        this.client = dataflowClient;
        this.keyFunction = function;
        this.keyComparator = comparator;
        this.deduplicate = z;
    }

    public StreamSupplier<T> compile(DataflowGraph dataflowGraph) {
        DataflowContext of = DataflowContext.of(dataflowGraph);
        List<StreamId> channels = this.input.channels(of);
        StreamReducer create = StreamReducer.create(this.keyComparator);
        int generateNodeIndex = of.generateNodeIndex();
        for (StreamId streamId : channels) {
            NodeUpload nodeUpload = new NodeUpload(generateNodeIndex, this.input.valueType(), streamId);
            Partition partition = dataflowGraph.getPartition(streamId);
            dataflowGraph.addNode(partition, nodeUpload);
            this.client.download(partition.getAddress(), streamId, this.input.valueType()).streamTo(create.newInput(this.keyFunction, this.deduplicate ? StreamReducers.deduplicateReducer() : StreamReducers.mergeReducer()));
        }
        return create.getOutput();
    }
}
