/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.stream.output.sink.distributed;

import java.util.ArrayList;
import java.util.List;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.stream.output.sink.distributed.DistributionStrategy;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.Option;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.exception.AttributeNotExistException;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name="partitioned", namespace="distributionStrategy", description="Publishing strategy to allow publish messages to multiple destination by partitioning.", examples={@Example(syntax="@sink(type='tcp', @map(type='text'),\n@distribution(strategy='partitioned', partitionKey='symbol',\n@destination(topic = 'topic1'),\n@destination(topic = 'topic2')))\ndefine stream BarStream (symbol string, price float, volume long);", description="In this example BarStream sink will act as partitioned manner to 'topic1' and 'topic2' destinations according to partitionKey='symbol'.")})
public class PartitionedDistributionStrategy
extends DistributionStrategy {
    private int totalDestinationCount = 0;
    private Option partitionOption;
    private List<Integer> returnValue = new ArrayList<Integer>();

    @Override
    public void init(StreamDefinition streamDefinition, OptionHolder transportOptionHolder, OptionHolder distributionOptionHolder, List<OptionHolder> destinationOptionHolders, ConfigReader configReader) {
        this.totalDestinationCount = destinationOptionHolders.size();
        String partitionKey = distributionOptionHolder.validateAndGetStaticValue("partitionKey");
        if (partitionKey == null || partitionKey.isEmpty()) {
            throw new SiddhiAppValidationException("PartitionKey is required for partitioned distribution strategy.");
        }
        try {
            int partitionKeyFieldPosition = streamDefinition.getAttributePosition(partitionKey);
            this.partitionOption = new Option(partitionKeyFieldPosition);
        }
        catch (AttributeNotExistException e) {
            throw new SiddhiAppValidationException("Could not find partition key attribute", (Throwable)e);
        }
    }

    @Override
    public List<Integer> getDestinationsToPublish(Object payload, DynamicOptions transportOptions) {
        String partitionKeyValue = this.partitionOption.getValue(transportOptions);
        int destinationId = Math.abs(partitionKeyValue.hashCode() % this.totalDestinationCount);
        if (this.destinationIds.contains(destinationId)) {
            this.returnValue.clear();
            this.returnValue.add(destinationId);
            return this.returnValue;
        }
        return EMPTY_RETURN_VALUE;
    }
}

