/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.river;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverException;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverModule;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverNameModule;
import org.elasticsearch.river.RiversPluginsModule;
import org.elasticsearch.river.RiversTypesRegistry;
import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateListener;
import org.elasticsearch.river.routing.RiverRouting;
import org.elasticsearch.threadpool.ThreadPool;

public class RiversService
extends AbstractLifecycleComponent<RiversService> {
    private final String riverIndexName;
    private Client client;
    private final ThreadPool threadPool;
    private final ClusterService clusterService;
    private final RiversTypesRegistry typesRegistry;
    private final Injector injector;
    private final Map<RiverName, Injector> riversInjectors = Maps.newHashMap();
    private volatile ImmutableMap<RiverName, River> rivers = ImmutableMap.of();

    @Inject
    public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiversTypesRegistry typesRegistry, RiverClusterService riverClusterService, Injector injector) {
        super(settings);
        this.riverIndexName = RiverIndexName.Conf.indexName(settings);
        this.client = client;
        this.threadPool = threadPool;
        this.clusterService = clusterService;
        this.typesRegistry = typesRegistry;
        this.injector = injector;
        riverClusterService.add(new ApplyRivers());
    }

    @Override
    protected void doStart() throws ElasticSearchException {
    }

    @Override
    protected void doStop() throws ElasticSearchException {
        ImmutableSet<RiverName> indices = ImmutableSet.copyOf(this.rivers.keySet());
        final CountDownLatch latch = new CountDownLatch(indices.size());
        for (final RiverName riverName : indices) {
            this.threadPool.generic().execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        RiversService.this.closeRiver(riverName);
                    }
                    catch (Exception e) {
                        RiversService.this.logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name());
                    }
                    finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    protected void doClose() throws ElasticSearchException {
    }

    public synchronized void createRiver(RiverName riverName, Map<String, Object> settings) throws ElasticSearchException {
        if (this.riversInjectors.containsKey(riverName)) {
            this.logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name());
            return;
        }
        this.logger.debug("creating river [{}][{}]", riverName.type(), riverName.name());
        try {
            ModulesBuilder modules = new ModulesBuilder();
            modules.add((Module)new RiverNameModule(riverName));
            modules.add((Module)new RiverModule(riverName, settings, this.settings, this.typesRegistry));
            modules.add((Module)new RiversPluginsModule(this.settings, this.injector.getInstance(PluginsService.class)));
            Injector indexInjector = modules.createChildInjector(this.injector);
            this.riversInjectors.put(riverName, indexInjector);
            River river = indexInjector.getInstance(River.class);
            this.rivers = MapBuilder.newMapBuilder(this.rivers).put(riverName, river).immutableMap();
            river.start();
            XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
            builder.field("ok", true);
            builder.startObject("node");
            builder.field("id", this.clusterService.localNode().id());
            builder.field("name", this.clusterService.localNode().name());
            builder.field("transport_address", this.clusterService.localNode().address().toString());
            builder.endObject();
            builder.endObject();
            this.client.prepareIndex(this.riverIndexName, riverName.name(), "_status").setConsistencyLevel(WriteConsistencyLevel.ONE).setSource(builder).execute().actionGet();
        }
        catch (Exception e) {
            this.logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name());
            try {
                XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
                builder.field("error", ExceptionsHelper.detailedMessage(e));
                builder.startObject("node");
                builder.field("id", this.clusterService.localNode().id());
                builder.field("name", this.clusterService.localNode().name());
                builder.field("transport_address", this.clusterService.localNode().address().toString());
                builder.endObject();
                builder.endObject();
                this.client.prepareIndex(this.riverIndexName, riverName.name(), "_status").setConsistencyLevel(WriteConsistencyLevel.ONE).setSource(builder).execute().actionGet();
            }
            catch (Exception e1) {
                this.logger.warn("failed to write failed status for river creation", e, new Object[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void closeRiver(RiverName riverName) throws ElasticSearchException {
        River river;
        RiversService riversService = this;
        synchronized (riversService) {
            Injector riverInjector = this.riversInjectors.remove(riverName);
            if (riverInjector == null) {
                throw new RiverException(riverName, "missing");
            }
            this.logger.debug("closing river [{}][{}]", riverName.type(), riverName.name());
            HashMap<RiverName, River> tmpMap = Maps.newHashMap(this.rivers);
            river = (River)tmpMap.remove(riverName);
            this.rivers = ImmutableMap.copyOf(tmpMap);
        }
        river.close();
        Injectors.close(this.injector);
    }

    private class ApplyRivers
    implements RiverClusterStateListener {
        private ApplyRivers() {
        }

        @Override
        public void riverClusterChanged(RiverClusterChangedEvent event) {
            DiscoveryNode localNode = RiversService.this.clusterService.localNode();
            RiverClusterState state = event.state();
            for (final RiverName riverName : RiversService.this.rivers.keySet()) {
                RiverRouting routing = state.routing().routing(riverName);
                if (routing != null && localNode.equals(routing.node())) continue;
                RiversService.this.closeRiver(riverName);
                try {
                    ((GetRequestBuilder)RiversService.this.client.prepareGet(RiversService.this.riverIndexName, riverName.name(), "_meta").setListenerThreaded(true)).execute(new ActionListener<GetResponse>(){

                        @Override
                        public void onResponse(GetResponse getResponse) {
                            if (!getResponse.isExists()) {
                                RiversService.this.client.admin().indices().prepareDeleteMapping(RiversService.this.riverIndexName).setType(riverName.name()).execute(new ActionListener<DeleteMappingResponse>(){

                                    @Override
                                    public void onResponse(DeleteMappingResponse deleteMappingResponse) {
                                    }

                                    @Override
                                    public void onFailure(Throwable e) {
                                        RiversService.this.logger.debug("failed to (double) delete river [{}] content", e, riverName.name());
                                    }
                                });
                            }
                        }

                        @Override
                        public void onFailure(Throwable e) {
                            RiversService.this.logger.debug("failed to (double) delete river [{}] content", e, riverName.name());
                        }
                    });
                }
                catch (IndexMissingException e) {
                }
                catch (Exception e) {
                    RiversService.this.logger.warn("unexpected failure when trying to verify river [{}] deleted", e, riverName.name());
                }
            }
            for (final RiverRouting routing : state.routing()) {
                if (routing.node() == null || !routing.node().equals(localNode) || RiversService.this.rivers.containsKey(routing.riverName())) continue;
                ((GetRequestBuilder)RiversService.this.client.prepareGet(RiversService.this.riverIndexName, routing.riverName().name(), "_meta").setListenerThreaded(true)).execute(new ActionListener<GetResponse>(){

                    @Override
                    public void onResponse(GetResponse getResponse) {
                        if (!RiversService.this.rivers.containsKey(routing.riverName()) && getResponse.isExists()) {
                            RiversService.this.createRiver(routing.riverName(), getResponse.getSourceAsMap());
                        }
                    }

                    @Override
                    public void onFailure(Throwable e) {
                        Throwable failure2 = ExceptionsHelper.unwrapCause(e);
                        if (TransportActions.isShardNotAvailableException(failure2)) {
                            RiversService.this.logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
                            final 2 listener = this;
                            try {
                                RiversService.this.threadPool.schedule(TimeValue.timeValueSeconds(5L), "same", new Runnable(){

                                    @Override
                                    public void run() {
                                        ((GetRequestBuilder)RiversService.this.client.prepareGet(RiversService.this.riverIndexName, routing.riverName().name(), "_meta").setListenerThreaded(true)).execute(listener);
                                    }
                                });
                            }
                            catch (EsRejectedExecutionException ex) {
                                RiversService.this.logger.debug("Couldn't schedule river start retry, node might be shutting down", ex, new Object[0]);
                            }
                        } else {
                            RiversService.this.logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
                        }
                    }
                });
            }
        }
    }
}

