/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.siddhi.core.util.transport;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.stream.output.sink.distributed.DistributedTransport;
import org.wso2.siddhi.core.stream.output.sink.distributed.DistributionStrategy;
import org.wso2.siddhi.core.util.SiddhiClassLoader;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.extension.holder.SinkExecutorExtensionHolder;
import org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.annotation.Annotation;
import org.wso2.siddhi.query.api.extension.Extension;

public class MultiClientDistributedSink
extends DistributedTransport {
    private static final Logger log = Logger.getLogger(MultiClientDistributedSink.class);
    private List<Sink> transports = new ArrayList<Sink>();

    @Override
    public void publish(Object payload, DynamicOptions transportOptions, Integer destinationId) throws ConnectionUnavailableException {
        Sink transport = this.transports.get(destinationId);
        try {
            transport.publish(payload, transportOptions);
        }
        catch (ConnectionUnavailableException e) {
            transport.setConnected(false);
            this.strategy.destinationFailed(destinationId);
            log.warn((Object)("Failed to publish payload to destination ID " + destinationId));
            transport.connectWithRetry();
            throw e;
        }
    }

    @Override
    public void initTransport(OptionHolder sinkOptionHolder, List<OptionHolder> destinationOptionHolders, Annotation sinkAnnotation, ConfigReader sinkConfigReader, DistributionStrategy strategy, String type, SiddhiAppContext siddhiAppContext) {
        String transportType = sinkOptionHolder.validateAndGetStaticValue("type");
        Extension sinkExtension = DefinitionParserHelper.constructExtension(this.streamDefinition, "Sink", transportType, sinkAnnotation, "sink");
        destinationOptionHolders.forEach(destinationOption -> {
            Sink sink = (Sink)SiddhiClassLoader.loadExtensionImplementation(sinkExtension, SinkExecutorExtensionHolder.getInstance(siddhiAppContext));
            destinationOption.merge(sinkOptionHolder);
            sink.initOnlyTransport(this.streamDefinition, (OptionHolder)destinationOption, sinkConfigReader, type, new MultiClientConnectionCallback(this.transports.size(), strategy), siddhiAppContext);
            this.transports.add(sink);
        });
    }

    @Override
    public Class[] getSupportedInputEventClasses() {
        return this.transports.get(0).getSupportedInputEventClasses();
    }

    @Override
    public void connect() throws ConnectionUnavailableException {
        for (Sink transport : this.transports) {
            if (transport.isConnected()) continue;
            transport.connectWithRetry();
        }
    }

    @Override
    public void disconnect() {
        this.transports.forEach(Sink::disconnect);
    }

    @Override
    public void destroy() {
        this.transports.forEach(Sink::destroy);
    }

    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        for (int i = 0; i < this.transports.size(); ++i) {
            state.put(Integer.toString(i), this.transports.get(i).currentState());
        }
        return state;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
        if (this.transports != null) {
            for (int i = 0; i < this.transports.size(); ++i) {
                Map transportState = (Map)state.get(Integer.toString(i));
                this.transports.get(i).restoreState(transportState);
            }
        }
    }

    public class MultiClientConnectionCallback
    extends DistributedTransport.ConnectionCallback {
        private final int destinationId;
        private final DistributionStrategy strategy;

        private MultiClientConnectionCallback(int destinationId, DistributionStrategy strategy) {
            this.destinationId = destinationId;
            this.strategy = strategy;
        }

        @Override
        public void connectionEstablished() {
            this.strategy.destinationAvailable(this.destinationId);
        }

        @Override
        public void connectionFailed() {
            this.strategy.destinationFailed(this.destinationId);
        }
    }
}

