/*
 * Decompiled with CFR 0.152.
 */
package io.kcache.keta.server.leader;

import io.kcache.keta.KetaConfig;
import io.kcache.keta.KetaEngine;
import io.kcache.keta.leader.LeaderElector;
import io.kcache.keta.server.grpc.proxy.GrpcProxy;
import io.kcache.keta.server.leader.ClientConfig;
import io.kcache.keta.server.leader.KetaCoordinator;
import io.kcache.keta.server.leader.KetaElectionException;
import io.kcache.keta.server.leader.KetaIdentity;
import io.kcache.keta.server.leader.KetaProtocol;
import io.kcache.keta.server.leader.KetaRebalanceListener;
import io.kcache.keta.transaction.client.KetaTransactionManager;
import java.io.Closeable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KetaLeaderElector
implements KetaRebalanceListener,
LeaderElector,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KetaLeaderElector.class);
    private static final AtomicInteger KDB_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kareldb";
    private final KetaEngine engine;
    private final GrpcProxy<byte[], byte[]> proxy;
    private final int initTimeout;
    private final String clientId;
    private final ConsumerNetworkClient client;
    private final Metrics metrics;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final KetaCoordinator coordinator;
    private final List<URI> listeners;
    private final KetaIdentity myIdentity;
    private KetaIdentity leader;
    private final Map<KetaIdentity, Integer> members = new ConcurrentHashMap<KetaIdentity, Integer>();
    private int generationId;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private ExecutorService executor;
    private final CountDownLatch joinedLatch = new CountDownLatch(1);

    public KetaLeaderElector(KetaConfig config, KetaEngine engine, GrpcProxy<byte[], byte[]> proxy) throws KetaElectionException {
        try {
            this.engine = engine;
            this.proxy = proxy;
            this.clientId = "kdb-" + KDB_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.listeners = KetaLeaderElector.parseListeners(config.getList("listeners"));
            this.myIdentity = KetaLeaderElector.findIdentity(this.listeners, config.getString("host.name"), config.getBoolean("leader.eligibility"));
            LinkedHashMap<String, String> metricsTags = new LinkedHashMap<String, String>();
            metricsTags.put("client-id", this.clientId);
            MetricConfig metricConfig = new MetricConfig().tags(metricsTags);
            List<JmxReporter> reporters = Collections.singletonList(new JmxReporter());
            KafkaMetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originals());
            Time time = Time.SYSTEM;
            ClientConfig clientConfig = new ClientConfig(config.originalsWithPrefix("kafkacache."), false);
            this.metrics = new Metrics(metricConfig, reporters, time, (MetricsContext)metricsContext);
            this.retryBackoffMs = clientConfig.getLong("retry.backoff.ms");
            String groupId = config.getString("cluster.group.id");
            LogContext logContext = new LogContext("[KarelDB clientId=" + this.clientId + ", groupId=" + groupId + "] ");
            this.metadata = new Metadata(this.retryBackoffMs, clientConfig.getLong("metadata.max.age.ms").longValue(), logContext, new ClusterResourceListeners());
            List bootstrapServers = config.getList("kafkacache.bootstrap.servers");
            List addresses = ClientUtils.parseAndValidateAddresses((List)bootstrapServers, (String)clientConfig.getString("client.dns.lookup"));
            this.metadata.bootstrap(addresses);
            String metricGrpPrefix = JMX_PREFIX;
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)clientConfig, (Time)time, (LogContext)logContext);
            long maxIdleMs = clientConfig.getLong("connections.max.idle.ms");
            NetworkClient netClient = new NetworkClient((Selectable)new Selector(maxIdleMs, this.metrics, time, metricGrpPrefix, channelBuilder, logContext), this.metadata, this.clientId, 100, clientConfig.getLong("reconnect.backoff.ms").longValue(), clientConfig.getLong("reconnect.backoff.max.ms").longValue(), clientConfig.getInt("send.buffer.bytes").intValue(), clientConfig.getInt("receive.buffer.bytes").intValue(), clientConfig.getInt("request.timeout.ms").intValue(), ClientDnsLookup.forConfig((String)clientConfig.getString("client.dns.lookup")), time, true, new ApiVersions(), logContext);
            this.client = new ConsumerNetworkClient(logContext, (KafkaClient)netClient, this.metadata, time, this.retryBackoffMs, clientConfig.getInt("request.timeout.ms").intValue(), Integer.MAX_VALUE);
            this.coordinator = new KetaCoordinator(logContext, this.client, groupId, 300000, 10000, 3000, this.metrics, metricGrpPrefix, time, this.retryBackoffMs, this.myIdentity, this);
            AppInfoParser.registerAppInfo((String)JMX_PREFIX, (String)this.clientId, (Metrics)this.metrics, (long)time.milliseconds());
            this.initTimeout = config.getInt("kafkacache.init.timeout.ms");
            LOG.debug("Group member created");
        }
        catch (Throwable t) {
            this.stop(true);
            throw new KetaElectionException("Failed to construct kafka consumer", t);
        }
    }

    static KetaIdentity findIdentity(List<URI> listeners, String host, boolean leaderEligibility) {
        Iterator<URI> iterator = listeners.iterator();
        if (iterator.hasNext()) {
            URI listener = iterator.next();
            return new KetaIdentity(listener.getScheme(), host, listener.getPort(), leaderEligibility);
        }
        throw new ConfigException("No listeners are configured. Must have at least one listener.");
    }

    static List<URI> parseListeners(List<String> listenersConfig) {
        ArrayList<URI> listeners = new ArrayList<URI>(listenersConfig.size());
        for (String listenerStr : listenersConfig) {
            URI uri;
            try {
                uri = new URI(listenerStr);
            }
            catch (URISyntaxException use) {
                throw new ConfigException("Could not parse a listener URI from the `listener` configuration option.");
            }
            String scheme = uri.getScheme();
            if (scheme == null) {
                throw new ConfigException("Found a listener without a scheme. All listeners must have a scheme. The listener without a scheme is: " + listenerStr);
            }
            if (uri.getPort() == -1) {
                throw new ConfigException("Found a listener without a port. All listeners must have a port. The listener without a port is: " + listenerStr);
            }
            listeners.add(uri);
        }
        if (listeners.isEmpty()) {
            throw new ConfigException("No listeners are configured. Must have at least one listener.");
        }
        return listeners;
    }

    public void init() throws KetaElectionException {
        LOG.debug("Initializing group member");
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(() -> {
            try {
                while (!this.stopped.get()) {
                    this.coordinator.poll(Integer.MAX_VALUE);
                }
            }
            catch (Throwable t) {
                LOG.error("Unexpected exception in group processing thread", t);
            }
        });
        try {
            if (!this.joinedLatch.await(this.initTimeout, TimeUnit.MILLISECONDS)) {
                throw new KetaElectionException("Timed out waiting for join group to complete");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KetaElectionException("Interrupted while waiting for join group to complete", e);
        }
        LOG.debug("Group member initialized and joined group");
    }

    @Override
    public void close() {
        if (this.stopped.get()) {
            return;
        }
        this.stop(false);
    }

    @Override
    public void onAssigned(KetaProtocol.Assignment assignment, int generation) {
        LOG.info("Finished rebalance by joining generation {}", (Object)generation);
        this.generationId = generation;
        KetaTransactionManager txMgr = KetaEngine.getInstance().getTxManager();
        txMgr.setGenerationId(generation);
        LOG.info("Finished rebalance with leader election result: {}", (Object)assignment);
        try {
            switch (assignment.error()) {
                case 0: {
                    if (assignment.leaderIdentity() == null) {
                        LOG.error("No leader eligible instances joined the group. Rebalancing was successful and this instance can serve reads, but no writes can be processed.");
                    }
                    this.setLeader(assignment.leaderIdentity());
                    this.setMembers(assignment.members());
                    LOG.info(this.isLeader() ? "Registered as leader" : "Registered as replica");
                    this.joinedLatch.countDown();
                    break;
                }
                case 1: {
                    throw new IllegalStateException("The group contained multiple members advertising the same URL. Verify that each instance has a unique, routable listener by setting the 'listeners' configuration. This error may happen if executing in containers where the default hostname is 'localhost'.");
                }
                default: {
                    throw new IllegalStateException("Unknown error returned from the coordination protocol");
                }
            }
        }
        catch (KetaElectionException e) {
            LOG.error("Error when updating leader, we will not be able to forward requests to the leader", (Throwable)e);
        }
    }

    @Override
    public void onRevoked() {
        LOG.info("Rebalance started");
        try {
            this.setLeader(null);
            this.setMembers(Collections.emptyList());
        }
        catch (KetaElectionException e) {
            LOG.error("Error when updating leader, we will not be able to forward requests to the leader", (Throwable)e);
        }
    }

    public KetaIdentity getIdentity() {
        return this.myIdentity;
    }

    public int getMemberId() {
        return this.members.getOrDefault(this.getIdentity(), 0);
    }

    public List<URI> getListeners() {
        return this.listeners;
    }

    public synchronized boolean isLeader() {
        return this.myIdentity.equals(this.leader);
    }

    public synchronized int getLeaderId() {
        return this.leader != null ? this.members.getOrDefault(this.leader, 0) : 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void setLeader(KetaIdentity leader) {
        KetaLeaderElector ketaLeaderElector = this;
        synchronized (ketaLeaderElector) {
            if (!this.isLeader() && this.myIdentity.equals(leader)) {
                LOG.info("Syncing caches...");
                this.engine.sync();
            }
            this.leader = leader;
        }
        this.proxy.setTarget(leader == null || this.myIdentity.equals(leader) ? null : leader);
    }

    public Collection<KetaIdentity> getMembers() {
        return this.members.keySet();
    }

    private void setMembers(Collection<KetaIdentity> members) {
        this.members.clear();
        int i = 0;
        int generation = this.generationId * 100;
        for (KetaIdentity member : members) {
            this.members.put(member, generation + ++i);
        }
    }

    private void stop(boolean swallowException) {
        LOG.trace("Stopping the group member.");
        if (this.client != null) {
            this.client.wakeup();
        }
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted waiting for group processing thread to exit", e);
            }
        }
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.stopped.set(true);
        KetaLeaderElector.closeQuietly(this.coordinator, "coordinator", firstException);
        KetaLeaderElector.closeQuietly((AutoCloseable)this.metrics, "consumer metrics", firstException);
        KetaLeaderElector.closeQuietly((AutoCloseable)this.client, "consumer network client", firstException);
        AppInfoParser.unregisterAppInfo((String)JMX_PREFIX, (String)this.clientId, (Metrics)this.metrics);
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to stop the group member", firstException.get());
        }
        LOG.debug("The group member has stopped.");
    }

    private static void closeQuietly(AutoCloseable closeable, String name, AtomicReference<Throwable> firstException) {
        if (closeable != null) {
            try {
                closeable.close();
            }
            catch (Throwable t) {
                firstException.compareAndSet(null, t);
                LOG.error("Failed to close {} with type {}", new Object[]{name, closeable.getClass().getName(), t});
            }
        }
    }
}

