/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.GroupCoordinatorRequest;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCoordinator
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
    private final int rebalanceTimeoutMs;
    private final int sessionTimeoutMs;
    private final GroupCoordinatorMetrics sensors;
    private final Heartbeat heartbeat;
    protected final String groupId;
    protected final ConsumerNetworkClient client;
    protected final Time time;
    protected final long retryBackoffMs;
    private HeartbeatThread heartbeatThread = null;
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private MemberState state = MemberState.UNJOINED;
    private RequestFuture<ByteBuffer> joinFuture = null;
    private Node coordinator = null;
    private Generation generation = Generation.NO_GENERATION;
    private RequestFuture<Void> findCoordinatorFuture = null;

    public AbstractCoordinator(ConsumerNetworkClient client, String groupId, int rebalanceTimeoutMs, int sessionTimeoutMs, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs) {
        this.client = client;
        this.time = time;
        this.groupId = groupId;
        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs);
        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
        this.retryBackoffMs = retryBackoffMs;
    }

    protected abstract String protocolType();

    protected abstract List<JoinGroupRequest.ProtocolMetadata> metadata();

    protected abstract void onJoinPrepare(int var1, String var2);

    protected abstract Map<String, ByteBuffer> performAssignment(String var1, String var2, Map<String, ByteBuffer> var3);

    protected abstract void onJoinComplete(int var1, String var2, String var3, ByteBuffer var4);

    public synchronized void ensureCoordinatorReady() {
        while (this.coordinatorUnknown()) {
            RequestFuture<Void> future = this.lookupCoordinator();
            this.client.poll(future);
            if (future.failed()) {
                if (future.isRetriable()) {
                    this.client.awaitMetadataUpdate();
                    continue;
                }
                throw future.exception();
            }
            if (this.coordinator == null || !this.client.connectionFailed(this.coordinator)) continue;
            this.coordinatorDead();
            this.time.sleep(this.retryBackoffMs);
        }
    }

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (this.findCoordinatorFuture == null) {
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                return RequestFuture.noBrokersAvailable();
            }
            this.findCoordinatorFuture = this.sendGroupCoordinatorRequest(node);
        }
        return this.findCoordinatorFuture;
    }

    private synchronized void clearFindCoordinatorFuture() {
        this.findCoordinatorFuture = null;
    }

    protected synchronized boolean needRejoin() {
        return this.rejoinNeeded;
    }

    private synchronized boolean rejoinIncomplete() {
        return this.joinFuture != null;
    }

    protected synchronized void pollHeartbeat(long now) {
        if (this.heartbeatThread != null) {
            if (this.heartbeatThread.hasFailed()) {
                RuntimeException cause = this.heartbeatThread.failureCause();
                this.heartbeatThread = null;
                throw cause;
            }
            this.heartbeat.poll(now);
        }
    }

    protected synchronized long timeToNextHeartbeat(long now) {
        if (this.state == MemberState.UNJOINED) {
            return Long.MAX_VALUE;
        }
        return this.heartbeat.timeToNextHeartbeat(now);
    }

    public void ensureActiveGroup() {
        this.ensureCoordinatorReady();
        this.startHeartbeatThreadIfNeeded();
        this.joinGroupIfNeeded();
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (this.heartbeatThread == null) {
            this.heartbeatThread = new HeartbeatThread();
            this.heartbeatThread.start();
        }
    }

    private synchronized void disableHeartbeatThread() {
        if (this.heartbeatThread != null) {
            this.heartbeatThread.disable();
        }
    }

    void joinGroupIfNeeded() {
        while (this.needRejoin() || this.rejoinIncomplete()) {
            this.ensureCoordinatorReady();
            if (this.needsJoinPrepare) {
                this.onJoinPrepare(this.generation.generationId, this.generation.memberId);
                this.needsJoinPrepare = false;
            }
            RequestFuture<ByteBuffer> future = this.initiateJoinGroup();
            this.client.poll(future);
            this.resetJoinGroupFuture();
            if (future.succeeded()) {
                this.needsJoinPrepare = true;
                this.onJoinComplete(this.generation.generationId, this.generation.memberId, this.generation.protocol, future.value());
                continue;
            }
            RuntimeException exception = future.exception();
            if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;
            if (!future.isRetriable()) {
                throw exception;
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    private synchronized void resetJoinGroupFuture() {
        this.joinFuture = null;
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        if (this.joinFuture == null) {
            this.disableHeartbeatThread();
            this.state = MemberState.REBALANCING;
            this.joinFuture = this.sendJoinGroupRequest();
            this.joinFuture.addListener(new RequestFutureListener<ByteBuffer>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onSuccess(ByteBuffer value) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        log.info("Successfully joined group {} with generation {}", (Object)AbstractCoordinator.this.groupId, (Object)((AbstractCoordinator)AbstractCoordinator.this).generation.generationId);
                        AbstractCoordinator.this.state = MemberState.STABLE;
                        if (AbstractCoordinator.this.heartbeatThread != null) {
                            AbstractCoordinator.this.heartbeatThread.enable();
                        }
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(RuntimeException e) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        AbstractCoordinator.this.state = MemberState.UNJOINED;
                    }
                }
            });
        }
        return this.joinFuture;
    }

    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        log.info("(Re-)joining group {}", (Object)this.groupId);
        JoinGroupRequest request = new JoinGroupRequest(this.groupId, this.sessionTimeoutMs, this.rebalanceTimeoutMs, this.generation.memberId, this.protocolType(), this.metadata());
        log.debug("Sending JoinGroup ({}) to coordinator {}", (Object)request, (Object)this.coordinator);
        return this.client.send(this.coordinator, ApiKeys.JOIN_GROUP, request).compose(new JoinGroupResponseHandler());
    }

    private RequestFuture<ByteBuffer> onJoinFollower() {
        SyncGroupRequest request = new SyncGroupRequest(this.groupId, this.generation.generationId, this.generation.memberId, Collections.emptyMap());
        log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", this.groupId, this.coordinator, request);
        return this.sendSyncGroupRequest(request);
    }

    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
        try {
            Map<String, ByteBuffer> groupAssignment = this.performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
            SyncGroupRequest request = new SyncGroupRequest(this.groupId, this.generation.generationId, this.generation.memberId, groupAssignment);
            log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", this.groupId, this.coordinator, request);
            return this.sendSyncGroupRequest(request);
        }
        catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        return this.client.send(this.coordinator, ApiKeys.SYNC_GROUP, request).compose(new SyncGroupResponseHandler());
    }

    private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
        log.debug("Sending coordinator request for group {} to broker {}", (Object)this.groupId, (Object)node);
        GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
        return this.client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest).compose(new GroupCoordinatorResponseHandler());
    }

    public boolean coordinatorUnknown() {
        return this.coordinator() == null;
    }

    protected synchronized Node coordinator() {
        if (this.coordinator != null && this.client.connectionFailed(this.coordinator)) {
            this.coordinatorDead();
            return null;
        }
        return this.coordinator;
    }

    protected synchronized void coordinatorDead() {
        if (this.coordinator != null) {
            log.info("Marking the coordinator {} dead for group {}", (Object)this.coordinator, (Object)this.groupId);
            this.client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
            this.coordinator = null;
        }
    }

    protected synchronized Generation generation() {
        if (this.state != MemberState.STABLE) {
            return null;
        }
        return this.generation;
    }

    protected synchronized void resetGeneration() {
        this.generation = Generation.NO_GENERATION;
        this.rejoinNeeded = true;
        this.state = MemberState.UNJOINED;
    }

    protected synchronized void requestRejoin() {
        this.rejoinNeeded = true;
    }

    @Override
    public synchronized void close() {
        if (this.heartbeatThread != null) {
            this.heartbeatThread.close();
        }
        this.maybeLeaveGroup();
    }

    public synchronized void maybeLeaveGroup() {
        if (!this.coordinatorUnknown() && this.state != MemberState.UNJOINED && this.generation != Generation.NO_GENERATION) {
            LeaveGroupRequest request = new LeaveGroupRequest(this.groupId, this.generation.memberId);
            this.client.send(this.coordinator, ApiKeys.LEAVE_GROUP, request).compose(new LeaveGroupResponseHandler());
            this.client.pollNoWakeup();
        }
        this.resetGeneration();
    }

    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
        return this.client.send(this.coordinator, ApiKeys.HEARTBEAT, req).compose(new HeartbeatResponseHandler());
    }

    private static class UnjoinedGroupException
    extends RetriableException {
        private UnjoinedGroupException() {
        }
    }

    protected static class Generation {
        public static final Generation NO_GENERATION = new Generation(-1, "", null);
        public final int generationId;
        public final String memberId;
        public final String protocol;

        public Generation(int generationId, String memberId, String protocol) {
            this.generationId = generationId;
            this.memberId = memberId;
            this.protocol = protocol;
        }
    }

    private class HeartbeatThread
    extends Thread {
        private boolean enabled;
        private boolean closed;
        private AtomicReference<RuntimeException> failed;

        HeartbeatThread() {
            super("kafka-coordinator-heartbeat-thread" + (AbstractCoordinator.this.groupId.isEmpty() ? "" : " | " + AbstractCoordinator.this.groupId));
            this.enabled = false;
            this.closed = false;
            this.failed = new AtomicReference<Object>(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enable() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                this.enabled = true;
                AbstractCoordinator.this.heartbeat.resetTimeouts(AbstractCoordinator.this.time.milliseconds());
                AbstractCoordinator.this.notify();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void disable() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                this.enabled = false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
            synchronized (abstractCoordinator) {
                this.closed = true;
                AbstractCoordinator.this.notify();
            }
        }

        private boolean hasFailed() {
            return this.failed.get() != null;
        }

        private RuntimeException failureCause() {
            return this.failed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            try {
                while (true) {
                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                    synchronized (abstractCoordinator) {
                        if (this.closed) {
                            return;
                        }
                        if (!this.enabled) {
                            AbstractCoordinator.this.wait();
                            continue;
                        }
                        if (AbstractCoordinator.this.state != MemberState.STABLE) {
                            this.disable();
                            continue;
                        }
                        AbstractCoordinator.this.client.pollNoWakeup();
                        long now = AbstractCoordinator.this.time.milliseconds();
                        if (AbstractCoordinator.this.coordinatorUnknown()) {
                            if (AbstractCoordinator.this.findCoordinatorFuture == null) {
                                AbstractCoordinator.this.lookupCoordinator();
                            } else {
                                AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                            }
                        } else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
                            AbstractCoordinator.this.coordinatorDead();
                        } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
                            AbstractCoordinator.this.maybeLeaveGroup();
                        } else if (!AbstractCoordinator.this.heartbeat.shouldHeartbeat(now)) {
                            AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                        } else {
                            AbstractCoordinator.this.heartbeat.sentHeartbeat(now);
                            AbstractCoordinator.this.sendHeartbeatRequest().addListener(new RequestFutureListener<Void>(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                @Override
                                public void onSuccess(Void value) {
                                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                                    synchronized (abstractCoordinator) {
                                        AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                    }
                                }

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                @Override
                                public void onFailure(RuntimeException e) {
                                    AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                                    synchronized (abstractCoordinator) {
                                        if (e instanceof RebalanceInProgressException) {
                                            AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                        } else {
                                            AbstractCoordinator.this.heartbeat.failHeartbeat();
                                            AbstractCoordinator.this.notify();
                                        }
                                    }
                                }
                            });
                        }
                    }
                }
            }
            catch (InterruptedException e) {
                log.error("Unexpected interrupt received in heartbeat thread for group {}", (Object)AbstractCoordinator.this.groupId, (Object)e);
                this.failed.set(new RuntimeException(e));
                return;
            }
            catch (RuntimeException e) {
                log.error("Heartbeat thread for group {} failed due to unexpected error", (Object)AbstractCoordinator.this.groupId, (Object)e);
                this.failed.set(e);
            }
        }
    }

    private class GroupCoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor heartbeatLatency;
        public final Sensor joinLatency;
        public final Sensor syncLatency;

        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
            this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a heartbeat request"), new Max());
            this.heartbeatLatency.add(metrics.metricName("heartbeat-rate", this.metricGrpName, "The average number of heartbeats per second"), new Rate(new Count()));
            this.joinLatency = metrics.sensor("join-latency");
            this.joinLatency.add(metrics.metricName("join-time-avg", this.metricGrpName, "The average time taken for a group rejoin"), new Avg());
            this.joinLatency.add(metrics.metricName("join-time-max", this.metricGrpName, "The max time taken for a group rejoin"), new Avg());
            this.joinLatency.add(metrics.metricName("join-rate", this.metricGrpName, "The number of group joins per second"), new Rate(new Count()));
            this.syncLatency = metrics.sensor("sync-latency");
            this.syncLatency.add(metrics.metricName("sync-time-avg", this.metricGrpName, "The average time taken for a group sync"), new Avg());
            this.syncLatency.add(metrics.metricName("sync-time-max", this.metricGrpName, "The max time taken for a group sync"), new Avg());
            this.syncLatency.add(metrics.metricName("sync-rate", this.metricGrpName, "The number of group syncs per second"), new Rate(new Count()));
            Measurable lastHeartbeat = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return TimeUnit.SECONDS.convert(now - AbstractCoordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                }
            };
            metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last controller heartbeat was sent"), lastHeartbeat);
        }
    }

    protected abstract class CoordinatorResponseHandler<R, T>
    extends RequestFutureAdapter<ClientResponse, T> {
        protected ClientResponse response;

        protected CoordinatorResponseHandler() {
        }

        public abstract R parse(ClientResponse var1);

        public abstract void handle(R var1, RequestFuture<T> var2);

        @Override
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            if (e instanceof DisconnectException) {
                AbstractCoordinator.this.coordinatorDead();
            }
            future.raise(e);
        }

        @Override
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
            block2: {
                try {
                    this.response = clientResponse;
                    R responseObj = this.parse(clientResponse);
                    this.handle(responseObj, future);
                }
                catch (RuntimeException e) {
                    if (future.isDone()) break block2;
                    future.raise(e);
                }
            }
        }
    }

    private class HeartbeatResponseHandler
    extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        private HeartbeatResponseHandler() {
        }

        @Override
        public HeartbeatResponse parse(ClientResponse response) {
            return new HeartbeatResponse(response.responseBody());
        }

        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            ((AbstractCoordinator)AbstractCoordinator.this).sensors.heartbeatLatency.record(this.response.requestLatencyMs());
            Errors error = Errors.forCode(heartbeatResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("Received successful heartbeat response for group {}", (Object)AbstractCoordinator.this.groupId);
                future.complete(null);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", (Object)AbstractCoordinator.this.groupId, (Object)AbstractCoordinator.this.coordinator());
                AbstractCoordinator.this.coordinatorDead();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", (Object)AbstractCoordinator.this.groupId);
                AbstractCoordinator.this.requestRejoin();
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", (Object)AbstractCoordinator.this.groupId);
                AbstractCoordinator.this.resetGeneration();
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heart beat failed for group {} since member id is not valid.", (Object)AbstractCoordinator.this.groupId);
                AbstractCoordinator.this.resetGeneration();
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    private class LeaveGroupResponseHandler
    extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
        private LeaveGroupResponseHandler() {
        }

        @Override
        public LeaveGroupResponse parse(ClientResponse response) {
            return new LeaveGroupResponse(response.responseBody());
        }

        @Override
        public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
            Errors error = Errors.forCode(leaveResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("LeaveGroup request for group {} returned successfully", (Object)AbstractCoordinator.this.groupId);
                future.complete(null);
            } else {
                log.debug("LeaveGroup request for group {} failed with error: {}", (Object)AbstractCoordinator.this.groupId, (Object)error.message());
                future.raise(error);
            }
        }
    }

    private class GroupCoordinatorResponseHandler
    extends RequestFutureAdapter<ClientResponse, Void> {
        private GroupCoordinatorResponseHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
            log.debug("Received group coordinator response {}", (Object)resp);
            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
            AbstractCoordinator.this.clearFindCoordinatorFuture();
            if (error == Errors.NONE) {
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    AbstractCoordinator.this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port());
                    log.info("Discovered coordinator {} for group {}.", (Object)AbstractCoordinator.this.coordinator, (Object)AbstractCoordinator.this.groupId);
                    AbstractCoordinator.this.client.tryConnect(AbstractCoordinator.this.coordinator);
                    AbstractCoordinator.this.heartbeat.resetTimeouts(AbstractCoordinator.this.time.milliseconds());
                }
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                log.debug("Group coordinator lookup for group {} failed: {}", (Object)AbstractCoordinator.this.groupId, (Object)error.message());
                future.raise(error);
            }
        }

        @Override
        public void onFailure(RuntimeException e, RequestFuture<Void> future) {
            AbstractCoordinator.this.clearFindCoordinatorFuture();
            super.onFailure(e, future);
        }
    }

    private class SyncGroupResponseHandler
    extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
        private SyncGroupResponseHandler() {
        }

        @Override
        public SyncGroupResponse parse(ClientResponse response) {
            return new SyncGroupResponse(response.responseBody());
        }

        @Override
        public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(syncResponse.errorCode());
            if (error == Errors.NONE) {
                ((AbstractCoordinator)AbstractCoordinator.this).sensors.syncLatency.record(this.response.requestLatencyMs());
                future.complete(syncResponse.memberAssignment());
            } else {
                AbstractCoordinator.this.requestRejoin();
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                    log.debug("SyncGroup for group {} failed due to coordinator rebalance", (Object)AbstractCoordinator.this.groupId);
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                    log.debug("SyncGroup for group {} failed due to {}", (Object)AbstractCoordinator.this.groupId, (Object)error);
                    AbstractCoordinator.this.resetGeneration();
                    future.raise(error);
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    log.debug("SyncGroup for group {} failed due to {}", (Object)AbstractCoordinator.this.groupId, (Object)error);
                    AbstractCoordinator.this.coordinatorDead();
                    future.raise(error);
                } else {
                    future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
                }
            }
        }
    }

    private class JoinGroupResponseHandler
    extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        private JoinGroupResponseHandler() {
        }

        @Override
        public JoinGroupResponse parse(ClientResponse response) {
            return new JoinGroupResponse(response.responseBody());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("Received successful join group response for group {}: {}", (Object)AbstractCoordinator.this.groupId, (Object)joinResponse.toStruct());
                ((AbstractCoordinator)AbstractCoordinator.this).sensors.joinLatency.record(this.response.requestLatencyMs());
                AbstractCoordinator abstractCoordinator = AbstractCoordinator.this;
                synchronized (abstractCoordinator) {
                    if (AbstractCoordinator.this.state != MemberState.REBALANCING) {
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
                        AbstractCoordinator.this.rejoinNeeded = false;
                        if (joinResponse.isLeader()) {
                            AbstractCoordinator.this.onJoinLeader(joinResponse).chain(future);
                        } else {
                            AbstractCoordinator.this.onJoinFollower().chain(future);
                        }
                    }
                }
            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", (Object)AbstractCoordinator.this.groupId, (Object)AbstractCoordinator.this.coordinator());
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.resetGeneration();
                log.debug("Attempt to join group {} failed due to unknown member id.", (Object)AbstractCoordinator.this.groupId);
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                AbstractCoordinator.this.coordinatorDead();
                log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", (Object)AbstractCoordinator.this.groupId, (Object)error.message());
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) {
                log.error("Attempt to join group {} failed due to fatal error: {}", (Object)AbstractCoordinator.this.groupId, (Object)error.message());
                future.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }
    }

    private static enum MemberState {
        UNJOINED,
        REBALANCING,
        STABLE;

    }
}

