/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.kafka.DynamicBrokersReader;
import org.apache.storm.kafka.DynamicPartitionConnections;
import org.apache.storm.kafka.KafkaUtils;
import org.apache.storm.kafka.Partition;
import org.apache.storm.kafka.PartitionCoordinator;
import org.apache.storm.kafka.PartitionManager;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.ZkState;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkCoordinator
implements PartitionCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
    SpoutConfig _spoutConfig;
    int _taskIndex;
    int _totalTasks;
    String _topologyInstanceId;
    Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
    List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
    Long _lastRefreshTime = null;
    int _refreshFreqMs;
    DynamicPartitionConnections _connections;
    DynamicBrokersReader _reader;
    ZkState _state;
    Map _stormConf;

    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
        this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, ZkCoordinator.buildReader(stormConf, spoutConfig));
    }

    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
        this._spoutConfig = spoutConfig;
        this._connections = connections;
        this._taskIndex = taskIndex;
        this._totalTasks = totalTasks;
        this._topologyInstanceId = topologyInstanceId;
        this._stormConf = stormConf;
        this._state = state;
        ZkHosts brokerConf = (ZkHosts)spoutConfig.hosts;
        this._refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
        this._reader = reader;
    }

    private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {
        ZkHosts hosts = (ZkHosts)spoutConfig.hosts;
        return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
    }

    @Override
    public List<PartitionManager> getMyManagedPartitions() {
        if (this._lastRefreshTime == null || System.currentTimeMillis() - this._lastRefreshTime > (long)this._refreshFreqMs) {
            this.refresh();
            this._lastRefreshTime = System.currentTimeMillis();
        }
        return this._cachedList;
    }

    @Override
    public void refresh() {
        try {
            LOG.info(KafkaUtils.taskId(this._taskIndex, this._totalTasks) + "Refreshing partition manager connections");
            List<GlobalPartitionInformation> brokerInfo = this._reader.getBrokerInfo();
            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, this._totalTasks, this._taskIndex);
            Set<Partition> curr = this._managers.keySet();
            HashSet<Partition> newPartitions = new HashSet<Partition>(mine);
            newPartitions.removeAll(curr);
            HashSet<Partition> deletedPartitions = new HashSet<Partition>(curr);
            deletedPartitions.removeAll(mine);
            LOG.info(KafkaUtils.taskId(this._taskIndex, this._totalTasks) + "Deleted partition managers: " + ((Object)deletedPartitions).toString());
            HashMap<Integer, PartitionManager> deletedManagers = new HashMap<Integer, PartitionManager>();
            for (Partition id : deletedPartitions) {
                deletedManagers.put(id.partition, this._managers.remove(id));
            }
            for (PartitionManager manager : deletedManagers.values()) {
                if (manager == null) continue;
                manager.close();
            }
            LOG.info(KafkaUtils.taskId(this._taskIndex, this._totalTasks) + "New partition managers: " + ((Object)newPartitions).toString());
            for (Partition id : newPartitions) {
                PartitionManager man = new PartitionManager(this._connections, this._topologyInstanceId, this._state, this._stormConf, this._spoutConfig, id, (PartitionManager)deletedManagers.get(id.partition));
                this._managers.put(id, man);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this._cachedList = new ArrayList<PartitionManager>(this._managers.values());
        LOG.info(KafkaUtils.taskId(this._taskIndex, this._totalTasks) + "Finished refreshing");
    }

    @Override
    public PartitionManager getManager(Partition partition2) {
        return this._managers.get(partition2);
    }
}

