/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.discovery.service;

import com.google.common.base.Joiner;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.discovery.service.DiscoveryService;
import org.apache.pulsar.discovery.service.server.ServiceConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerDiscoveryProvider
implements Closeable {
    final MetadataStoreCacheLoader metadataStoreCacheLoader;
    private final AtomicInteger counter = new AtomicInteger();
    private PulsarResources pulsarResources;
    private final OrderedScheduler orderedExecutor = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(4).name("pulsar-discovery-ordered").build();
    private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4, (ThreadFactory)new DefaultThreadFactory("pulsar-discovery"));
    private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
    private static final Logger LOG = LoggerFactory.getLogger(BrokerDiscoveryProvider.class);

    public BrokerDiscoveryProvider(ServiceConfig config, PulsarResources pulsarResources) throws PulsarServerException {
        try {
            this.pulsarResources = pulsarResources;
            this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarResources, config.getZookeeperSessionTimeoutMs());
        }
        catch (Exception e) {
            LOG.error("Failed to start ZooKeeper {}", (Object)e.getMessage(), (Object)e);
            throw new PulsarServerException("Failed to start zookeeper :" + e.getMessage(), (Throwable)e);
        }
    }

    LoadManagerReport nextBroker() throws PulsarServerException {
        List<LoadManagerReport> availableBrokers = this.getAvailableBrokers();
        if (availableBrokers.isEmpty()) {
            throw new PulsarServerException("No active broker is available");
        }
        int brokersCount = availableBrokers.size();
        int nextIdx = MathUtils.signSafeMod((long)this.counter.getAndIncrement(), (int)brokersCount);
        return availableBrokers.get(nextIdx);
    }

    List<LoadManagerReport> getAvailableBrokers() {
        List availableBrokers = this.metadataStoreCacheLoader.getAvailableBrokers();
        return availableBrokers;
    }

    CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DiscoveryService service, TopicName topicName, String role, AuthenticationDataSource authenticationData) {
        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<PartitionedTopicMetadata>();
        try {
            BrokerDiscoveryProvider.checkAuthorization(service, topicName, role, authenticationData);
            String path = BrokerDiscoveryProvider.path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
            ((CompletableFuture)this.pulsarResources.getNamespaceResources().getPartitionedTopicResources().getAsync(path).thenAccept(metadata -> {
                if (metadata.isPresent()) {
                    metadataFuture.complete((PartitionedTopicMetadata)metadata.get());
                } else {
                    metadataFuture.complete(new PartitionedTopicMetadata());
                }
            })).exceptionally(ex -> {
                metadataFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (Exception e) {
            metadataFuture.completeExceptionally(e);
        }
        return metadataFuture;
    }

    protected static void checkAuthorization(DiscoveryService service, TopicName topicName, String role, AuthenticationDataSource authenticationData) throws Exception {
        if (!service.getConfiguration().isAuthorizationEnabled() || service.getConfiguration().getSuperUserRoles().contains(role)) {
            return;
        }
        if (!service.getAuthorizationService().canLookup(topicName, role, authenticationData)) {
            TenantInfo tenantInfo;
            LOG.warn("[{}] Role {} is not allowed to lookup topic", (Object)topicName, (Object)role);
            try {
                tenantInfo = (TenantInfo)service.getPulsarResources().getTenantResources().get(BrokerDiscoveryProvider.path("policies", topicName.getTenant())).orElseThrow(() -> new IllegalAccessException("Property does not exist"));
            }
            catch (MetadataStoreException.NotFoundException e) {
                LOG.warn("Failed to get property admin data for non existing property {}", (Object)topicName.getTenant());
                throw new IllegalAccessException("Property does not exist");
            }
            catch (Exception e) {
                LOG.error("Failed to get property admin data for property");
                throw new IllegalAccessException(String.format("Failed to get property %s admin data due to %s", topicName.getTenant(), e.getMessage()));
            }
            if (!((Boolean)service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, tenantInfo, authenticationData).get()).booleanValue()) {
                throw new IllegalAccessException("Don't have permission to administrate resources on this property");
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Successfully authorized {} on property {}", (Object)role, (Object)topicName.getTenant());
        }
    }

    public static String path(String ... parts) {
        StringBuilder sb = new StringBuilder();
        sb.append("/admin/");
        Joiner.on((char)'/').appendTo(sb, (Object[])parts);
        return sb.toString();
    }

    @Override
    public void close() throws IOException {
        this.metadataStoreCacheLoader.close();
        this.orderedExecutor.shutdown();
        this.scheduledExecutorScheduler.shutdownNow();
    }
}

