/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.rpc.async;

import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.WaitResult;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteCallPool
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteCallPool.class);
    private static final int WAIT_TIME_MS = 5;
    private static final long WARNING_PERIOD = 10000L;
    private final List<RemoteCallBundle> pendingObjectBundles = new LinkedList<RemoteCallBundle>();
    private Map<RemoteCallBundle, Callback<Object>> singletonHandlerMap = new ConcurrentHashMap<RemoteCallBundle, Callback<Object>>();
    private Map<RemoteCallBundle, Callback<List<Object>>> bundleHandlerMap = new ConcurrentHashMap<RemoteCallBundle, Callback<List<Object>>>();
    private Map<RemoteCallBundle, ExceptionHandler<Throwable>> bundleExceptionHandlerMap = new ConcurrentHashMap<RemoteCallBundle, ExceptionHandler<Throwable>>();
    private ThreadPoolExecutor callBackPool = new ThreadPoolExecutor(2, Runtime.getRuntime().availableProcessors(), 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new CallbackThreadFactory());
    private volatile boolean stop = false;

    public RemoteCallPool() {
        Thread t = new Thread(Ray.wrapRunnable((Runnable)this), "remote-pool-loop");
        t.setUncaughtExceptionHandler((thread, throwable) -> LOG.error("Error in remote call pool thread.", throwable));
        t.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> void bindCallback(ObjectRef<T> obj, Callback<T> callback, ExceptionHandler<Throwable> onException) {
        List<ObjectRef<Object>> objectRefList = Collections.singletonList(obj);
        RemoteCallBundle bundle = new RemoteCallBundle(objectRefList, true);
        this.singletonHandlerMap.put(bundle, callback);
        this.bundleExceptionHandlerMap.put(bundle, onException);
        List<RemoteCallBundle> list = this.pendingObjectBundles;
        synchronized (list) {
            this.pendingObjectBundles.add(bundle);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void bindCallback(List<ObjectRef<Object>> objectBundle, Callback<List<Object>> callback, ExceptionHandler<Throwable> onException) {
        RemoteCallBundle bundle = new RemoteCallBundle(objectBundle, false);
        this.bundleHandlerMap.put(bundle, callback);
        this.bundleExceptionHandlerMap.put(bundle, onException);
        List<RemoteCallBundle> list = this.pendingObjectBundles;
        synchronized (list) {
            this.pendingObjectBundles.add(bundle);
        }
    }

    public void stop() {
        this.stop = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (!this.stop) {
            try {
                if (this.pendingObjectBundles.isEmpty()) {
                    Thread.sleep(5L);
                    continue;
                }
                List<RemoteCallBundle> list = this.pendingObjectBundles;
                synchronized (list) {
                    Iterator<RemoteCallBundle> itr = this.pendingObjectBundles.iterator();
                    while (itr.hasNext()) {
                        RemoteCallBundle bundle = itr.next();
                        WaitResult waitResult = Ray.wait(bundle.objects, (int)bundle.objects.size(), (int)5);
                        List readyObjs = waitResult.getReady();
                        if (readyObjs.size() != bundle.objects.size()) {
                            long now = System.currentTimeMillis();
                            long waitingTime = now - bundle.createTime;
                            if (waitingTime <= 10000L || now - bundle.lastWarnTs <= 10000L) continue;
                            bundle.lastWarnTs = now;
                            LOG.warn("Bundle has being waiting for {} ms, bundle = {}.", (Object)waitingTime, (Object)bundle);
                            continue;
                        }
                        ExceptionHandler<Throwable> exceptionHandler = this.bundleExceptionHandlerMap.get(bundle);
                        if (bundle.isSingletonBundle) {
                            this.callBackPool.execute(Ray.wrapRunnable(() -> {
                                block2: {
                                    try {
                                        this.singletonHandlerMap.get(bundle).handle(((ObjectRef)readyObjs.get(0)).get());
                                        this.singletonHandlerMap.remove(bundle);
                                    }
                                    catch (Throwable th) {
                                        LOG.error("Error when get object, objectId = {}.", (Object)((ObjectRef)readyObjs.get(0)).toString(), (Object)th);
                                        if (exceptionHandler == null) break block2;
                                        exceptionHandler.handle(th);
                                    }
                                }
                            }));
                        } else {
                            List results = readyObjs.stream().map(ObjectRef::get).collect(Collectors.toList());
                            List resultIds = readyObjs.stream().map(Object::toString).collect(Collectors.toList());
                            this.callBackPool.execute(Ray.wrapRunnable(() -> {
                                block2: {
                                    try {
                                        this.bundleHandlerMap.get(bundle).handle(results);
                                        this.bundleHandlerMap.remove(bundle);
                                    }
                                    catch (Throwable th) {
                                        LOG.error("Error when get object, objectIds = {}.", (Object)resultIds, (Object)th);
                                        if (exceptionHandler == null) break block2;
                                        exceptionHandler.handle(th);
                                    }
                                }
                            }));
                        }
                        itr.remove();
                    }
                }
            }
            catch (Exception e) {
                LOG.error("Exception in wait loop.", (Throwable)e);
            }
        }
        LOG.info("Wait loop finished.");
    }

    static class CallbackThreadFactory
    implements ThreadFactory {
        private AtomicInteger cnt = new AtomicInteger(0);

        CallbackThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, throwable) -> LOG.error("Callback err.", throwable));
            t.setName("callback-thread-" + this.cnt.getAndIncrement());
            return t;
        }
    }

    private static class RemoteCallBundle {
        List<ObjectRef<Object>> objects;
        boolean isSingletonBundle;
        long lastWarnTs = System.currentTimeMillis();
        long createTime = System.currentTimeMillis();

        RemoteCallBundle(List<ObjectRef<Object>> objects, boolean isSingletonBundle) {
            this.objects = objects;
            this.isSingletonBundle = isSingletonBundle;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("[");
            this.objects.forEach(rayObj -> sb.append(rayObj.toString()).append(","));
            sb.append("]");
            return sb.toString();
        }
    }

    @FunctionalInterface
    public static interface Callback<T> {
        public void handle(T var1) throws Throwable;
    }

    @FunctionalInterface
    public static interface ExceptionHandler<T> {
        public void handle(T var1);
    }
}

