/*
 * Decompiled with CFR 0.152.
 */
package io.kcache.keta.server.leader;

import io.kcache.keta.server.leader.KetaIdentity;
import io.kcache.keta.server.leader.KetaProtocol;
import io.kcache.keta.server.leader.KetaRebalanceListener;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class KetaCoordinator
extends AbstractCoordinator
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KetaCoordinator.class);
    public static final String KDB_SUBPROTOCOL_V0 = "v0";
    private final KetaIdentity identity;
    private KetaProtocol.Assignment assignmentSnapshot;
    private final KetaRebalanceListener listener;

    public KetaCoordinator(LogContext logContext, ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, KetaIdentity identity, KetaRebalanceListener listener) {
        super(new GroupRebalanceConfig(sessionTimeoutMs, rebalanceTimeoutMs, heartbeatIntervalMs, groupId, Optional.empty(), retryBackoffMs, true), logContext, client, metrics, metricGrpPrefix, time);
        this.identity = identity;
        this.assignmentSnapshot = null;
        this.listener = listener;
    }

    public String protocolType() {
        return "kdb";
    }

    public void poll(long timeout) {
        long elapsed;
        long remaining;
        long start;
        long now = start = this.time.milliseconds();
        do {
            if (this.coordinatorUnknown()) {
                this.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
                now = this.time.milliseconds();
            }
            if (this.rejoinNeededOrPending()) {
                this.ensureActiveGroup();
                now = this.time.milliseconds();
            }
            this.pollHeartbeat(now);
            elapsed = now - start;
            remaining = timeout - elapsed;
            this.client.poll(this.time.timer(Math.min(Math.max(0L, remaining), this.timeToNextHeartbeat(now))));
        } while ((remaining = timeout - (elapsed = (now = this.time.milliseconds()) - start)) > 0L);
    }

    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        ByteBuffer metadata = KetaProtocol.serializeMetadata(this.identity);
        return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol().setName(KDB_SUBPROTOCOL_V0).setMetadata(metadata.array())).iterator());
    }

    protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
        this.assignmentSnapshot = KetaProtocol.deserializeAssignment(memberAssignment);
        this.listener.onAssigned(this.assignmentSnapshot, generation);
    }

    protected Map<String, ByteBuffer> performAssignment(String kafkaLeaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
        LOG.debug("Performing assignment");
        ArrayList<KetaIdentity> members = new ArrayList<KetaIdentity>();
        HashMap<String, KetaIdentity> memberConfigs = new HashMap<String, KetaIdentity>();
        for (JoinGroupResponseData.JoinGroupResponseMember entry : allMemberMetadata) {
            KetaIdentity identity = KetaProtocol.deserializeMetadata(ByteBuffer.wrap(entry.metadata()));
            memberConfigs.put(entry.memberId(), identity);
            members.add(identity);
        }
        LOG.debug("Member information: {}", memberConfigs);
        KetaIdentity leaderIdentity = null;
        String leaderKafkaId = null;
        HashSet<String> urls = new HashSet<String>();
        for (Map.Entry entry : memberConfigs.entrySet()) {
            boolean smallerIdentity;
            String kafkaMemberId = (String)entry.getKey();
            KetaIdentity memberIdentity = (KetaIdentity)entry.getValue();
            urls.add(memberIdentity.getUrl());
            boolean eligible = memberIdentity.getLeaderEligibility();
            boolean bl = smallerIdentity = leaderIdentity == null || memberIdentity.getUrl().compareTo(leaderIdentity.getUrl()) < 0;
            if (!eligible || !smallerIdentity) continue;
            leaderKafkaId = kafkaMemberId;
            leaderIdentity = memberIdentity;
        }
        short error = 0;
        if (urls.size() != memberConfigs.size()) {
            LOG.error("Found duplicate URLs for group members. This indicates a misconfiguration and is common when executing in containers. Use the host.name configuration to set each instance's advertised host name to a value that is routable from all other group members.");
            error = 1;
        }
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        KetaProtocol.Assignment assignment = new KetaProtocol.Assignment(error, leaderKafkaId, leaderIdentity, members);
        LOG.debug("Assignment: {}", (Object)assignment);
        for (String member : memberConfigs.keySet()) {
            groupAssignment.put(member, KetaProtocol.serializeAssignment(assignment));
        }
        return groupAssignment;
    }

    protected void onJoinPrepare(int generation, String memberId) {
        LOG.debug("Revoking previous assignment {}", (Object)this.assignmentSnapshot);
        if (this.assignmentSnapshot != null) {
            this.listener.onRevoked();
        }
    }

    protected synchronized boolean ensureCoordinatorReady(Timer timer) {
        return super.ensureCoordinatorReady(timer);
    }

    protected boolean rejoinNeededOrPending() {
        return super.rejoinNeededOrPending() || this.assignmentSnapshot == null;
    }
}

