/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.util.transport;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import org.wso2.siddhi.core.util.ExceptionUtil;
import org.wso2.siddhi.core.util.SiddhiClassLoader;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.extension.holder.SinkExecutorExtensionHolder;
import org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.annotation.Annotation;
import org.wso2.siddhi.query.api.extension.Extension;

public class MultiClientDistributedSink
extends DistributedTransport {
    private static final Logger log = Logger.getLogger(MultiClientDistributedSink.class);
    private List<Sink> transports = new ArrayList<Sink>();

    @Override
    public void publish(Object payload, DynamicOptions transportOptions, Integer destinationId) throws ConnectionUnavailableException {
        Sink transport = this.transports.get(destinationId);
        try {
            transport.publish(payload, transportOptions);
        }
        catch (ConnectionUnavailableException e) {
            transport.setConnected(false);
            this.strategy.destinationFailed(destinationId);
            log.warn((Object)("Failed to publish payload to destination ID " + destinationId));
            throw e;
        }
    }

    @Override
    public void initTransport(OptionHolder sinkOptionHolder, List<OptionHolder> destinationOptionHolders, Annotation sinkAnnotation, ConfigReader sinkConfigReader, SiddhiAppContext siddhiAppContext) {
        String transportType = sinkOptionHolder.validateAndGetStaticValue("type");
        Extension sinkExtension = DefinitionParserHelper.constructExtension(this.streamDefinition, "Sink", transportType, sinkAnnotation, "sink");
        destinationOptionHolders.forEach(destinationOption -> {
            Sink sink = (Sink)SiddhiClassLoader.loadExtensionImplementation(sinkExtension, SinkExecutorExtensionHolder.getInstance(siddhiAppContext));
            destinationOption.merge(sinkOptionHolder);
            sink.initOnlyTransport(this.streamDefinition, (OptionHolder)destinationOption, sinkConfigReader, siddhiAppContext);
            this.transports.add(sink);
        });
    }

    @Override
    public Class[] getSupportedInputEventClasses() {
        return this.transports.get(0).getSupportedInputEventClasses();
    }

    @Override
    public void connect() throws ConnectionUnavailableException {
        StringBuilder errorMessages = null;
        int errorCount = 0;
        for (int i = 0; i < this.transports.size(); ++i) {
            try {
                Sink transport = this.transports.get(i);
                if (transport.isConnected()) continue;
                transport.connect();
                transport.setConnected(true);
                this.strategy.destinationAvailable(i);
                log.info((Object)("Connected to destination Id " + i));
                continue;
            }
            catch (ConnectionUnavailableException e) {
                ++errorCount;
                if (errorMessages == null) {
                    errorMessages = new StringBuilder();
                }
                errorMessages.append("[Destination").append(i).append("]:").append(e.getMessage());
                log.warn((Object)(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Failed to Connect to destination ID " + i));
            }
        }
        if (errorCount > 0) {
            throw new ConnectionUnavailableException("Error on '" + this.siddhiAppContext.getName() + "'. " + errorCount + "/" + this.transports.size() + " connections failed while trying to connect with following error " + "messages:" + errorMessages.toString());
        }
    }

    @Override
    public void disconnect() {
        this.transports.forEach(Sink::disconnect);
    }

    @Override
    public void destroy() {
        this.transports.forEach(Sink::destroy);
    }

    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        for (int i = 0; i < this.transports.size(); ++i) {
            state.put(Integer.toString(i), this.transports.get(i).currentState());
        }
        return state;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
        if (this.transports != null) {
            for (int i = 0; i < this.transports.size(); ++i) {
                Map transportState = (Map)state.get(Integer.toString(i));
                this.transports.get(i).restoreState(transportState);
            }
        }
    }
}

