/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Selector
implements Selectable {
    private static final Logger log = LoggerFactory.getLogger(Selector.class);
    private final java.nio.channels.Selector selector;
    private final Map<Integer, SelectionKey> keys;
    private final List<NetworkSend> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final List<Integer> disconnected;
    private final List<Integer> connected;
    private final Time time;
    private final SelectorMetrics sensors;
    private final String metricGrpPrefix;
    private final Map<String, String> metricTags;

    public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
        try {
            this.selector = java.nio.channels.Selector.open();
        }
        catch (IOException e) {
            throw new KafkaException(e);
        }
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.keys = new HashMap<Integer, SelectionKey>();
        this.completedSends = new ArrayList<NetworkSend>();
        this.completedReceives = new ArrayList<NetworkReceive>();
        this.connected = new ArrayList<Integer>();
        this.disconnected = new ArrayList<Integer>();
        this.sensors = new SelectorMetrics(metrics);
    }

    @Override
    public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.keys.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id);
        }
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        socket.setKeepAlive(true);
        socket.setSendBufferSize(sendBufferSize);
        socket.setReceiveBufferSize(receiveBufferSize);
        socket.setTcpNoDelay(true);
        try {
            channel.connect(address);
        }
        catch (UnresolvedAddressException e) {
            channel.close();
            throw new IOException("Can't resolve address: " + address, e);
        }
        catch (IOException e) {
            channel.close();
            throw e;
        }
        SelectionKey key = channel.register(this.selector, 8);
        key.attach(new Transmissions(id));
        this.keys.put(id, key);
    }

    @Override
    public void disconnect(int id) {
        SelectionKey key = this.keys.get(id);
        if (key != null) {
            key.cancel();
        }
    }

    @Override
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override
    public void close() {
        for (SelectionKey key : this.selector.keys()) {
            this.close(key);
        }
        try {
            this.selector.close();
        }
        catch (IOException e) {
            log.error("Exception closing selector:", e);
        }
    }

    @Override
    public void poll(long timeout, List<NetworkSend> sends) throws IOException {
        this.clear();
        for (NetworkSend send2 : sends) {
            SelectionKey key = this.keyForId(send2.destination());
            Transmissions transmissions = this.transmissions(key);
            if (transmissions.hasSend()) {
                throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
            }
            transmissions.send = send2;
            try {
                key.interestOps(key.interestOps() | 4);
            }
            catch (CancelledKeyException e) {
                this.close(key);
            }
        }
        long startSelect = this.time.nanoseconds();
        int readyKeys = this.select(timeout);
        long endSelect = this.time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, this.time.milliseconds());
        if (readyKeys > 0) {
            Set<SelectionKey> keys = this.selector.selectedKeys();
            Iterator<SelectionKey> iter2 = keys.iterator();
            while (iter2.hasNext()) {
                SelectionKey key = iter2.next();
                iter2.remove();
                Transmissions transmissions = this.transmissions(key);
                SocketChannel channel = this.channel(key);
                this.sensors.maybeRegisterNodeMetrics(transmissions.id);
                try {
                    if (key.isConnectable()) {
                        channel.finishConnect();
                        key.interestOps(key.interestOps() & 0xFFFFFFF7 | 1);
                        this.connected.add(transmissions.id);
                        this.sensors.connectionCreated.record();
                    }
                    if (key.isReadable()) {
                        if (!transmissions.hasReceive()) {
                            transmissions.receive = new NetworkReceive(transmissions.id);
                        }
                        transmissions.receive.readFrom(channel);
                        if (transmissions.receive.complete()) {
                            transmissions.receive.payload().rewind();
                            this.completedReceives.add(transmissions.receive);
                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
                            transmissions.clearReceive();
                        }
                    }
                    if (key.isWritable()) {
                        transmissions.send.writeTo(channel);
                        if (transmissions.send.remaining() <= 0) {
                            this.completedSends.add(transmissions.send);
                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
                            transmissions.clearSend();
                            key.interestOps(key.interestOps() & 0xFFFFFFFB);
                        }
                    }
                    if (key.isValid()) continue;
                    this.close(key);
                }
                catch (IOException e) {
                    InetAddress remoteAddress = null;
                    Socket socket = channel.socket();
                    if (socket != null) {
                        remoteAddress = socket.getInetAddress();
                    }
                    log.warn("Error in I/O with {}", (Object)remoteAddress, (Object)e);
                    this.close(key);
                }
            }
        }
        long endIo = this.time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, this.time.milliseconds());
    }

    @Override
    public List<NetworkSend> completedSends() {
        return this.completedSends;
    }

    @Override
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override
    public List<Integer> disconnected() {
        return this.disconnected;
    }

    @Override
    public List<Integer> connected() {
        return this.connected;
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
    }

    private int select(long ms) throws IOException {
        if (ms == 0L) {
            return this.selector.selectNow();
        }
        if (ms < 0L) {
            return this.selector.select();
        }
        return this.selector.select(ms);
    }

    private void close(SelectionKey key) {
        SocketChannel channel = this.channel(key);
        Transmissions trans = this.transmissions(key);
        if (trans != null) {
            this.disconnected.add(trans.id);
            this.keys.remove(trans.id);
            trans.clearReceive();
            trans.clearSend();
        }
        key.attach(null);
        key.cancel();
        try {
            channel.socket().close();
            channel.close();
        }
        catch (IOException e) {
            log.error("Exception closing connection to node {}:", (Object)trans.id, (Object)e);
        }
        this.sensors.connectionClosed.record();
    }

    private SelectionKey keyForId(int id) {
        SelectionKey key = this.keys.get(id);
        if (key == null) {
            throw new IllegalStateException("Attempt to write to socket for which there is no open connection.");
        }
        return key;
    }

    private Transmissions transmissions(SelectionKey key) {
        return (Transmissions)key.attachment();
    }

    private SocketChannel channel(SelectionKey key) {
        return (SocketChannel)key.channel();
    }

    private class SelectorMetrics {
        private final Metrics metrics;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;

        public SelectorMetrics(Metrics metrics) {
            this.metrics = metrics;
            String metricGrpName = Selector.this.metricGrpPrefix + "-metrics";
            this.connectionClosed = this.metrics.sensor("connections-closed");
            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", Selector.this.metricTags);
            this.connectionClosed.add(metricName, new Rate());
            this.connectionCreated = this.metrics.sensor("connections-created");
            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", Selector.this.metricTags);
            this.connectionCreated.add(metricName, new Rate());
            this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", Selector.this.metricTags);
            this.bytesTransferred.add(metricName, new Rate(new Count()));
            this.bytesSent = this.metrics.sensor("bytes-sent", this.bytesTransferred);
            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate());
            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate(new Count()));
            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Avg());
            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Max());
            this.bytesReceived = this.metrics.sensor("bytes-received", this.bytesTransferred);
            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate());
            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate(new Count()));
            this.selectTime = this.metrics.sensor("select-time");
            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(new Count()));
            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Avg());
            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            this.ioTime = this.metrics.sensor("io-time");
            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", Selector.this.metricTags);
            this.ioTime.add(metricName, new Avg());
            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", Selector.this.metricTags);
            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", Selector.this.metricTags);
            this.metrics.addMetric(metricName, new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return Selector.this.keys.size();
                }
            });
        }

        public void maybeRegisterNodeMetrics(int node) {
            String nodeRequestName;
            Sensor nodeRequest;
            if (node >= 0 && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + node + ".bytes-sent")) == null) {
                String metricGrpName = Selector.this.metricGrpPrefix + "-node-metrics";
                LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(Selector.this.metricTags);
                tags.put("node-id", "node-" + node);
                nodeRequest = this.metrics.sensor(nodeRequestName);
                MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
                nodeRequest.add(metricName, new Rate());
                metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
                nodeRequest.add(metricName, new Rate(new Count()));
                metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
                nodeRequest.add(metricName, new Avg());
                metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                nodeRequest.add(metricName, new Max());
                String nodeResponseName = "node-" + node + ".bytes-received";
                Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
                metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
                nodeResponse.add(metricName, new Rate());
                metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                nodeResponse.add(metricName, new Rate(new Count()));
                String nodeTimeName = "node-" + node + ".latency";
                Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
                metricName = new MetricName("request-latency-avg", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Avg());
                metricName = new MetricName("request-latency-max", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Max());
            }
        }

        public void recordBytesSent(int node, int bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesSent.record(bytes, now);
            if (node >= 0 && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + node + ".bytes-sent")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void recordBytesReceived(int node, int bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesReceived.record(bytes, now);
            if (node >= 0 && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + node + ".bytes-received")) != null) {
                nodeRequest.record(bytes, now);
            }
        }
    }

    private static class Transmissions {
        public int id;
        public NetworkSend send;
        public NetworkReceive receive;

        public Transmissions(int id) {
            this.id = id;
        }

        public boolean hasSend() {
            return this.send != null;
        }

        public void clearSend() {
            this.send = null;
        }

        public boolean hasReceive() {
            return this.receive != null;
        }

        public void clearReceive() {
            this.receive = null;
        }
    }
}

