/*
 * Decompiled with CFR 0.152.
 */
package io.warp10.script.functions;

import io.warp10.WarpConfig;
import io.warp10.WarpURLEncoder;
import io.warp10.script.NamedWarpScriptFunction;
import io.warp10.script.WarpScriptException;
import io.warp10.script.WarpScriptStack;
import io.warp10.script.WarpScriptStackFunction;
import io.warp10.warp.sdk.Capabilities;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CALL
extends NamedWarpScriptFunction
implements WarpScriptStackFunction {
    private static final Logger LOG = LoggerFactory.getLogger(CALL.class);
    private static int maxCapacity = Integer.parseInt(WarpConfig.getProperty("warpscript.call.maxcapacity", "1"));
    private static long maxWait = Long.parseLong(WarpConfig.getProperty("warpscript.call.maxwait", "10000"));
    private static final String MAXWAIT_CAPABILITY = "call.maxwait";
    private Map<String, ProcessPool> subprograms = new HashMap<String, ProcessPool>();

    public CALL(String name) {
        super(name);
    }

    @Override
    public Object apply(WarpScriptStack stack) throws WarpScriptException {
        Object subprogram = stack.pop();
        Object args = stack.pop();
        if (!(subprogram instanceof String) || !(args instanceof String)) {
            throw new WarpScriptException(this.getName() + " expects a subprogram name on top of an argument string.");
        }
        this.init(subprogram.toString());
        int attempts = 2;
        long maxWait = CALL.maxWait;
        if (null != Capabilities.get(stack, MAXWAIT_CAPABILITY)) {
            maxWait = Long.parseLong(Capabilities.get(stack, MAXWAIT_CAPABILITY));
        }
        while (attempts > 0) {
            Process proc = null;
            try {
                --attempts;
                proc = this.subprograms.get(subprogram).get(maxWait);
                if (null == proc) {
                    throw new WarpScriptException(this.getName() + " unable to acquire subprogram.");
                }
                BufferedReader br = this.subprograms.get(subprogram).getReader(proc);
                while (br.ready()) {
                    String sbr = br.readLine();
                    LOG.warn("skipping unexpected CALL output from " + subprogram.toString() + " (" + StringUtils.substring((String)sbr, (int)0, (int)1000) + "...)");
                }
                proc.getOutputStream().write(WarpURLEncoder.encode(args.toString(), StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8));
                proc.getOutputStream().write(10);
                proc.getOutputStream().flush();
                String ret = br.readLine();
                if (null == ret) {
                    throw new WarpScriptException(this.getName() + " subprogram died unexpectedly.");
                }
                if (ret.startsWith(" ")) {
                    throw new WarpScriptException(URLDecoder.decode(ret.substring(1), StandardCharsets.UTF_8.name()));
                }
                stack.push(URLDecoder.decode(ret, StandardCharsets.UTF_8.name()));
                break;
            }
            catch (IOException ioe) {
                if (attempts > 0) continue;
                throw new WarpScriptException(ioe);
            }
            finally {
                if (null == proc) continue;
                this.subprograms.get(subprogram).release(proc);
            }
        }
        return stack;
    }

    private synchronized void init(String subprogram) throws WarpScriptException {
        if (this.subprograms.containsKey(subprogram)) {
            return;
        }
        String dir = WarpConfig.getProperty("warpscript.call.directory");
        if (null == dir) {
            throw new WarpScriptException(this.getName() + " configuration key '" + "warpscript.call.directory" + "' not set, " + this.getName() + " disabled.");
        }
        File root = new File(dir);
        File f = new File(root, subprogram);
        if (!f.exists() || !f.canExecute()) {
            throw new WarpScriptException(this.getName() + " invalid subprogram '" + subprogram + "'.");
        }
        if (!f.getAbsolutePath().startsWith(root.getAbsolutePath())) {
            throw new WarpScriptException(this.getName() + " invalid subprogram, not in the correct directory.");
        }
        this.subprograms.put(subprogram, new ProcessPool(f.getAbsolutePath()));
    }

    private static final boolean isAlive(Process proc) {
        try {
            proc.exitValue();
            return false;
        }
        catch (IllegalThreadStateException e) {
            return true;
        }
    }

    private static class ProcessPool {
        private final ReentrantLock mutex = new ReentrantLock(true);
        private List<Process> processes = new ArrayList<Process>();
        private Map<Process, BufferedReader> readers = new HashMap<Process, BufferedReader>();
        private AtomicInteger loaned = new AtomicInteger(0);
        private ProcessBuilder builder;
        private int capacity = 0;

        public ProcessPool(final String path) {
            this.builder = new ProcessBuilder(path);
            Thread hook = new Thread(){

                @Override
                public void run() {
                    Process proc = null;
                    try {
                        while (!processes.isEmpty()) {
                            proc = (Process)processes.remove(0);
                            LOG.info("Ending CALL subprocess " + path);
                            proc.destroy();
                        }
                    }
                    catch (Exception e) {
                        LOG.warn("Error ending CALL subprocess" + path);
                    }
                }
            };
            Runtime.getRuntime().addShutdownHook(hook);
        }

        public void provision() throws IOException {
            try {
                this.mutex.lockInterruptibly();
                if (this.capacity > 0 && this.processes.size() + this.loaned.get() >= this.capacity) {
                    return;
                }
                Process proc = this.builder.start();
                BufferedReader br = new BufferedReader(new InputStreamReader(proc.getInputStream()));
                String cap = br.readLine();
                if (null == cap) {
                    throw new RuntimeException("Subprogram '" + this.builder.command().toString() + "' did not return its configured capacity.");
                }
                this.capacity = Integer.parseInt(cap);
                if (this.capacity > maxCapacity || this.capacity < 0) {
                    this.capacity = maxCapacity;
                }
                this.processes.add(proc);
                this.readers.put(proc, br);
            }
            catch (InterruptedException ie) {
                throw new IOException("Interrupted while provisioning process.", ie);
            }
            finally {
                if (this.mutex.isHeldByCurrentThread()) {
                    this.mutex.unlock();
                }
            }
        }

        public Process get(long maxWait) throws IOException {
            Process proc = null;
            long deadline = System.currentTimeMillis() + maxWait;
            while (null == proc && !Thread.currentThread().isInterrupted()) {
                try {
                    this.mutex.lockInterruptibly();
                    if (this.processes.isEmpty()) {
                        this.provision();
                    }
                    if (!this.processes.isEmpty()) {
                        proc = this.processes.remove(0);
                        if (CALL.isAlive(proc)) {
                            this.loaned.addAndGet(1);
                            Process process = proc;
                            return process;
                        }
                        this.readers.remove(proc);
                        proc = null;
                    }
                }
                catch (InterruptedException ie) {
                    throw new IOException("Interrupted while retrieving process.", ie);
                }
                finally {
                    if (this.mutex.isHeldByCurrentThread()) {
                        this.mutex.unlock();
                    }
                }
                if (System.currentTimeMillis() >= deadline) {
                    throw new IOException("Timed out waiting for available process.");
                }
                LockSupport.parkNanos(100000L);
            }
            if (Thread.currentThread().isInterrupted()) {
                throw new IOException("Interrupted while retrieving process.");
            }
            return proc;
        }

        public void release(Process proc) {
            try {
                this.mutex.lockInterruptibly();
                if (CALL.isAlive(proc)) {
                    this.processes.add(proc);
                } else {
                    this.readers.remove(proc);
                }
                this.loaned.addAndGet(-1);
            }
            catch (InterruptedException ie) {
                throw new RuntimeException("Interrupted while releasing process.", ie);
            }
            finally {
                if (this.mutex.isHeldByCurrentThread()) {
                    this.mutex.unlock();
                }
            }
        }

        public BufferedReader getReader(Process proc) {
            return this.readers.get(proc);
        }
    }
}

