/*
 * Decompiled with CFR 0.152.
 */
package org.apache.thrift.server;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.Invocation;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TThreadedSelectorServer
extends AbstractNonblockingServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
    private AcceptThread acceptThread;
    private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
    private final ExecutorService invoker;
    private final Args args;

    public TThreadedSelectorServer(Args args) {
        super(args);
        args.validate();
        this.invoker = args.executorService == null ? TThreadedSelectorServer.createDefaultExecutor(args) : args.executorService;
        this.args = args;
    }

    @Override
    protected boolean startThreads() {
        try {
            for (int i = 0; i < this.args.selectorThreads; ++i) {
                this.selectorThreads.add(new SelectorThread(this.args.acceptQueueSizePerThread));
            }
            this.acceptThread = new AcceptThread((TNonblockingServerTransport)this.serverTransport_, this.createSelectorThreadLoadBalancer(this.selectorThreads));
            for (SelectorThread thread : this.selectorThreads) {
                thread.start();
            }
            this.acceptThread.start();
            return true;
        }
        catch (IOException e) {
            LOGGER.error("Failed to start threads!", e);
            return false;
        }
    }

    @Override
    protected void waitForShutdown() {
        try {
            this.joinThreads();
        }
        catch (InterruptedException e) {
            LOGGER.error("Interrupted while joining threads!", e);
        }
        this.gracefullyShutdownInvokerPool();
    }

    protected void joinThreads() throws InterruptedException {
        this.acceptThread.join();
        for (SelectorThread thread : this.selectorThreads) {
            thread.join();
        }
    }

    @Override
    public void stop() {
        this.stopped_ = true;
        this.stopListening();
        if (this.acceptThread != null) {
            this.acceptThread.wakeupSelector();
        }
        if (this.selectorThreads != null) {
            for (SelectorThread thread : this.selectorThreads) {
                if (thread == null) continue;
                thread.wakeupSelector();
            }
        }
    }

    protected void gracefullyShutdownInvokerPool() {
        long newnow;
        this.invoker.shutdown();
        long now = System.currentTimeMillis();
        for (long timeoutMS = this.args.stopTimeoutUnit.toMillis(this.args.stopTimeoutVal); timeoutMS >= 0L; timeoutMS -= newnow - now) {
            try {
                this.invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
                break;
            }
            catch (InterruptedException ix) {
                newnow = System.currentTimeMillis();
                now = newnow;
                continue;
            }
        }
    }

    @Override
    protected boolean requestInvoke(AbstractNonblockingServer.FrameBuffer frameBuffer) {
        Runnable invocation = this.getRunnable(frameBuffer);
        if (this.invoker != null) {
            try {
                this.invoker.execute(invocation);
                return true;
            }
            catch (RejectedExecutionException rx) {
                LOGGER.warn("ExecutorService rejected execution!", rx);
                return false;
            }
        }
        invocation.run();
        return true;
    }

    protected Runnable getRunnable(AbstractNonblockingServer.FrameBuffer frameBuffer) {
        return new Invocation(frameBuffer);
    }

    protected static ExecutorService createDefaultExecutor(Args options) {
        return options.workerThreads > 0 ? Executors.newFixedThreadPool(options.workerThreads) : null;
    }

    private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
        if (queueSize == 0) {
            return new LinkedBlockingQueue<TNonblockingTransport>();
        }
        return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
    }

    protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
        return new SelectorThreadLoadBalancer(threads);
    }

    public static class Args
    extends AbstractNonblockingServer.AbstractNonblockingServerArgs<Args> {
        public int selectorThreads = 2;
        private int workerThreads = 5;
        private int stopTimeoutVal = 60;
        private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
        private ExecutorService executorService = null;
        private int acceptQueueSizePerThread = 4;
        private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;

        public Args(TNonblockingServerTransport transport) {
            super(transport);
        }

        public Args selectorThreads(int i) {
            this.selectorThreads = i;
            return this;
        }

        public int getSelectorThreads() {
            return this.selectorThreads;
        }

        public Args workerThreads(int i) {
            this.workerThreads = i;
            return this;
        }

        public int getWorkerThreads() {
            return this.workerThreads;
        }

        public int getStopTimeoutVal() {
            return this.stopTimeoutVal;
        }

        public Args stopTimeoutVal(int stopTimeoutVal) {
            this.stopTimeoutVal = stopTimeoutVal;
            return this;
        }

        public TimeUnit getStopTimeoutUnit() {
            return this.stopTimeoutUnit;
        }

        public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
            this.stopTimeoutUnit = stopTimeoutUnit;
            return this;
        }

        public ExecutorService getExecutorService() {
            return this.executorService;
        }

        public Args executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public int getAcceptQueueSizePerThread() {
            return this.acceptQueueSizePerThread;
        }

        public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
            this.acceptQueueSizePerThread = acceptQueueSizePerThread;
            return this;
        }

        public AcceptPolicy getAcceptPolicy() {
            return this.acceptPolicy;
        }

        public Args acceptPolicy(AcceptPolicy acceptPolicy) {
            this.acceptPolicy = acceptPolicy;
            return this;
        }

        public void validate() {
            if (this.selectorThreads <= 0) {
                throw new IllegalArgumentException("selectorThreads must be positive.");
            }
            if (this.workerThreads < 0) {
                throw new IllegalArgumentException("workerThreads must be non-negative.");
            }
            if (this.acceptQueueSizePerThread <= 0) {
                throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
            }
        }

        public static enum AcceptPolicy {
            FAIR_ACCEPT,
            FAST_ACCEPT;

        }
    }

    protected class SelectorThread
    extends AbstractNonblockingServer.AbstractSelectThread {
        private final BlockingQueue<TNonblockingTransport> acceptedQueue;
        private int SELECTOR_AUTO_REBUILD_THRESHOLD;
        private long MONITOR_PERIOD;
        private int jvmBug;

        public SelectorThread() throws IOException {
            this(new LinkedBlockingQueue<TNonblockingTransport>());
        }

        public SelectorThread(int maxPendingAccepts) throws IOException {
            this(TThreadedSelectorServer.createDefaultAcceptQueue(maxPendingAccepts));
        }

        public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
            super(TThreadedSelectorServer.this);
            this.SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
            this.MONITOR_PERIOD = 1000L;
            this.jvmBug = 0;
            this.acceptedQueue = acceptedQueue;
        }

        public boolean addAcceptedConnection(TNonblockingTransport accepted) {
            try {
                this.acceptedQueue.put(accepted);
            }
            catch (InterruptedException e) {
                LOGGER.warn("Interrupted while adding accepted connection!", e);
                return false;
            }
            this.selector.wakeup();
            return true;
        }

        @Override
        public void run() {
            try {
                while (!TThreadedSelectorServer.this.stopped_) {
                    this.select();
                    this.processAcceptedConnections();
                    this.processInterestChanges();
                }
                for (SelectionKey selectionKey : this.selector.keys()) {
                    this.cleanupSelectionKey(selectionKey);
                }
            }
            catch (Throwable t) {
                LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
            }
            finally {
                try {
                    this.selector.close();
                }
                catch (IOException e) {
                    LOGGER.error("Got an IOException while closing selector!", e);
                }
                TThreadedSelectorServer.this.stop();
            }
        }

        private void select() {
            try {
                this.doSelect();
                Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
                while (!TThreadedSelectorServer.this.stopped_ && selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
                    if (!key.isValid()) {
                        this.cleanupSelectionKey(key);
                        continue;
                    }
                    if (key.isReadable()) {
                        this.handleRead(key);
                        continue;
                    }
                    if (key.isWritable()) {
                        this.handleWrite(key);
                        continue;
                    }
                    LOGGER.warn("Unexpected state in select! " + key.interestOps());
                }
            }
            catch (IOException e) {
                LOGGER.warn("Got an IOException while selecting!", e);
            }
        }

        private void doSelect() throws IOException {
            long beforeSelect = System.currentTimeMillis();
            int selectedNums = this.selector.select();
            long afterSelect = System.currentTimeMillis();
            this.jvmBug = selectedNums == 0 ? ++this.jvmBug : 0;
            long selectedTime = afterSelect - beforeSelect;
            if (selectedTime >= this.MONITOR_PERIOD) {
                this.jvmBug = 0;
            } else if (this.jvmBug > this.SELECTOR_AUTO_REBUILD_THRESHOLD) {
                LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", (Object)this.MONITOR_PERIOD, (Object)this.jvmBug);
                this.rebuildSelector();
                this.selector.selectNow();
                this.jvmBug = 0;
            }
        }

        private synchronized void rebuildSelector() {
            Selector oldSelector = this.selector;
            if (oldSelector == null) {
                return;
            }
            Selector newSelector = null;
            try {
                newSelector = Selector.open();
                LOGGER.warn("Created new Selector.");
            }
            catch (IOException e) {
                LOGGER.error("Create new Selector error.", e);
            }
            for (SelectionKey key : oldSelector.selectedKeys()) {
                if (!key.isValid() && key.readyOps() == 0) continue;
                SelectableChannel channel = key.channel();
                Object attachment = key.attachment();
                try {
                    if (attachment == null) {
                        channel.register(newSelector, key.readyOps());
                        continue;
                    }
                    channel.register(newSelector, key.readyOps(), attachment);
                }
                catch (ClosedChannelException e) {
                    LOGGER.error("Register new selector key error.", e);
                }
            }
            this.selector = newSelector;
            try {
                oldSelector.close();
            }
            catch (IOException e) {
                LOGGER.error("Close old selector error.", e);
            }
            LOGGER.warn("Replace new selector success.");
        }

        private void processAcceptedConnections() {
            TNonblockingTransport accepted;
            while (!TThreadedSelectorServer.this.stopped_ && (accepted = (TNonblockingTransport)this.acceptedQueue.poll()) != null) {
                this.registerAccepted(accepted);
            }
        }

        protected AbstractNonblockingServer.FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractNonblockingServer.AbstractSelectThread selectThread) throws TTransportException {
            return TThreadedSelectorServer.this.processorFactory_.isAsyncProcessor() ? new AbstractNonblockingServer.AsyncFrameBuffer(TThreadedSelectorServer.this, trans, selectionKey, selectThread) : new AbstractNonblockingServer.FrameBuffer(TThreadedSelectorServer.this, trans, selectionKey, selectThread);
        }

        private void registerAccepted(TNonblockingTransport accepted) {
            SelectionKey clientKey = null;
            try {
                clientKey = accepted.registerSelector(this.selector, 1);
                AbstractNonblockingServer.FrameBuffer frameBuffer = this.createFrameBuffer(accepted, clientKey, this);
                clientKey.attach(frameBuffer);
            }
            catch (IOException | TTransportException e) {
                LOGGER.warn("Failed to register accepted connection to selector!", e);
                if (clientKey != null) {
                    this.cleanupSelectionKey(clientKey);
                }
                accepted.close();
            }
        }
    }

    protected class AcceptThread
    extends Thread {
        private final TNonblockingServerTransport serverTransport;
        private final Selector acceptSelector;
        private final SelectorThreadLoadBalancer threadChooser;

        public AcceptThread(TNonblockingServerTransport serverTransport, SelectorThreadLoadBalancer threadChooser) throws IOException {
            this.serverTransport = serverTransport;
            this.threadChooser = threadChooser;
            this.acceptSelector = SelectorProvider.provider().openSelector();
            this.serverTransport.registerSelector(this.acceptSelector);
        }

        @Override
        public void run() {
            try {
                if (TThreadedSelectorServer.this.eventHandler_ != null) {
                    TThreadedSelectorServer.this.eventHandler_.preServe();
                }
                while (!TThreadedSelectorServer.this.stopped_) {
                    this.select();
                }
            }
            catch (Throwable t) {
                LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
            }
            finally {
                try {
                    this.acceptSelector.close();
                }
                catch (IOException e) {
                    LOGGER.error("Got an IOException while closing accept selector!", e);
                }
                TThreadedSelectorServer.this.stop();
            }
        }

        public void wakeupSelector() {
            this.acceptSelector.wakeup();
        }

        private void select() {
            try {
                this.acceptSelector.select();
                Iterator<SelectionKey> selectedKeys = this.acceptSelector.selectedKeys().iterator();
                while (!TThreadedSelectorServer.this.stopped_ && selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
                    if (!key.isValid()) continue;
                    if (key.isAcceptable()) {
                        this.handleAccept();
                        continue;
                    }
                    LOGGER.warn("Unexpected state in select! " + key.interestOps());
                }
            }
            catch (IOException e) {
                LOGGER.warn("Got an IOException while selecting!", e);
            }
        }

        private void handleAccept() {
            final TNonblockingTransport client = this.doAccept();
            if (client != null) {
                final SelectorThread targetThread = this.threadChooser.nextThread();
                if (TThreadedSelectorServer.this.args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || TThreadedSelectorServer.this.invoker == null) {
                    this.doAddAccept(targetThread, client);
                } else {
                    try {
                        TThreadedSelectorServer.this.invoker.submit(new Runnable(){

                            @Override
                            public void run() {
                                AcceptThread.this.doAddAccept(targetThread, client);
                            }
                        });
                    }
                    catch (RejectedExecutionException rx) {
                        LOGGER.warn("ExecutorService rejected accept registration!", rx);
                        client.close();
                    }
                }
            }
        }

        private TNonblockingTransport doAccept() {
            try {
                return this.serverTransport.accept();
            }
            catch (TTransportException tte) {
                LOGGER.warn("Exception trying to accept!", tte);
                return null;
            }
        }

        private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
            if (!thread.addAcceptedConnection(client)) {
                client.close();
            }
        }
    }

    protected static class SelectorThreadLoadBalancer {
        private final Collection<? extends SelectorThread> threads;
        private Iterator<? extends SelectorThread> nextThreadIterator;

        public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
            if (threads.isEmpty()) {
                throw new IllegalArgumentException("At least one selector thread is required");
            }
            this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
            this.nextThreadIterator = this.threads.iterator();
        }

        public SelectorThread nextThread() {
            if (!this.nextThreadIterator.hasNext()) {
                this.nextThreadIterator = this.threads.iterator();
            }
            return this.nextThreadIterator.next();
        }
    }
}

