/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.process;

import cascading.flow.BaseFlow;
import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.planner.PlatformInfo;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.stats.process.ProcessFlowStats;
import cascading.tap.Tap;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Version;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import riffle.process.scheduler.ProcessException;
import riffle.process.scheduler.ProcessWrapper;

public class ProcessFlow<Process, Config>
extends BaseFlow<Config> {
    private final Process process;
    private final ProcessWrapper processWrapper;
    private Config config;
    private boolean isStarted = false;
    private Map<Object, Object> properties;

    @ConstructorProperties(value={"name", "process"})
    public ProcessFlow(String name, Process process) {
        this(new Properties(), name, process);
    }

    @ConstructorProperties(value={"properties", "name", "process"})
    public ProcessFlow(Map<Object, Object> properties, String name, Process process) {
        this(properties, name, process, null);
    }

    @ConstructorProperties(value={"properties", "name", "process", "flowDescriptor"})
    public ProcessFlow(Map<Object, Object> properties, String name, Process process, Map<String, String> flowDescriptor) {
        super(new PlatformInfo("process", "Chris K Wensel <chris@wensel.net>", Version.getRelease()), properties, null, name, flowDescriptor);
        this.process = process;
        this.processWrapper = new ProcessWrapper(this.process);
        this.properties = properties;
        this.setName(name);
        this.setTapFromProcess();
        this.initProcessConfig();
        this.initStats();
    }

    private void initStats() {
        try {
            if (this.processWrapper.hasCounters()) {
                this.flowStats = new ProcessFlowStats(this, this.getFlowSession().getCascadingServices().createClientState(this.getID()), this.processWrapper);
                this.flowStats.prepare();
                this.flowStats.markPending();
            } else {
                this.flowStats = this.createPrepareFlowStats();
            }
        }
        catch (ProcessException exception) {
            throw new FlowException(exception);
        }
    }

    public void setTapFromProcess() {
        this.setSources(this.createSources(this.processWrapper));
        this.setSinks(this.createSinks(this.processWrapper));
        this.setTraps(this.createTraps(this.processWrapper));
    }

    public Process getProcess() {
        return this.process;
    }

    @Override
    protected void initConfig(Map<Object, Object> properties, Config parentConfig) {
    }

    private void initProcessConfig() {
        try {
            this.config = this.processWrapper.getConfiguration();
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not get configuration from process", exception.getCause());
        }
    }

    @Override
    protected void setConfigProperty(Config properties, Object key, Object value) {
    }

    @Override
    protected Config newConfig(Config defaultConfig) {
        return null;
    }

    @Override
    public Config getConfig() {
        return this.config;
    }

    @Override
    public Config getConfigCopy() {
        return null;
    }

    @Override
    public Map<Object, Object> getConfigAsProperties() {
        HashMap<Object, Object> props = new HashMap<Object, Object>();
        if (this.properties != null) {
            props.putAll(this.properties);
        }
        return props;
    }

    @Override
    public String getProperty(String key) {
        return null;
    }

    @Override
    public FlowProcess<Config> getFlowProcess() {
        return FlowProcess.NULL;
    }

    @Override
    public boolean stepsAreLocal() {
        return true;
    }

    @Override
    public void prepare() {
        try {
            this.processWrapper.prepare();
        }
        catch (Throwable throwable) {
            if (throwable.getCause() instanceof RuntimeException) {
                throw (RuntimeException)throwable.getCause();
            }
            throw new FlowException("could not call prepare on process", throwable.getCause());
        }
    }

    @Override
    public void start() {
        try {
            this.flowStats.markPending();
            this.fireOnStarting();
            this.processWrapper.start();
            this.flowStats.markStarted();
            this.isStarted = true;
        }
        catch (Throwable throwable) {
            this.fireOnThrowable(throwable);
            if (throwable.getCause() instanceof RuntimeException) {
                throw (RuntimeException)throwable.getCause();
            }
            throw new FlowException("could not call start on process", throwable.getCause());
        }
    }

    @Override
    protected void internalStart() {
        try {
            this.deleteSinksIfReplace();
            this.deleteTrapsIfReplace();
            this.deleteCheckpointsIfReplace();
        }
        catch (IOException exception) {
            throw new FlowException("unable to delete sinks", exception);
        }
    }

    @Override
    public void stop() {
        try {
            this.fireOnStopping();
            this.processWrapper.stop();
            if (!this.flowStats.isFinished()) {
                this.flowStats.markStopped();
            }
        }
        catch (Throwable throwable) {
            this.flowStats.markFailed(throwable);
            this.fireOnThrowable(throwable);
            if (throwable.getCause() instanceof RuntimeException) {
                throw (RuntimeException)throwable.getCause();
            }
            throw new FlowException("could not call stop on process", throwable.getCause());
        }
    }

    @Override
    protected void internalClean(boolean stop) {
    }

    @Override
    public void complete() {
        try {
            if (!this.isStarted) {
                this.flowStats.markPending();
                this.fireOnStarting();
                this.isStarted = true;
                this.flowStats.markStarted();
            }
            this.flowStats.markRunning();
            this.processWrapper.complete();
            this.fireOnCompleted();
            this.flowStats.markSuccessful();
        }
        catch (Throwable throwable) {
            this.flowStats.markFailed(throwable);
            this.fireOnThrowable(throwable);
            if (throwable.getCause() instanceof RuntimeException) {
                throw (RuntimeException)throwable.getCause();
            }
            throw new FlowException("could not call complete on process", throwable.getCause());
        }
    }

    @Override
    public void cleanup() {
        try {
            this.processWrapper.cleanup();
        }
        catch (Throwable throwable) {
            if (throwable.getCause() instanceof RuntimeException) {
                throw (RuntimeException)throwable.getCause();
            }
            throw new FlowException("could not call cleanup on process", throwable.getCause());
        }
    }

    @Override
    protected int getMaxNumParallelSteps() {
        return 1;
    }

    @Override
    protected void internalShutdown() {
    }

    private Map<String, Tap> createSources(ProcessWrapper processParent) {
        try {
            return this.makeTapMap(processParent.getDependencyIncoming());
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not get process incoming dependency", exception.getCause());
        }
    }

    private Map<String, Tap> createSinks(ProcessWrapper processParent) {
        try {
            return this.makeTapMap(processParent.getDependencyOutgoing());
        }
        catch (ProcessException exception) {
            if (exception.getCause() instanceof RuntimeException) {
                throw (RuntimeException)exception.getCause();
            }
            throw new FlowException("could not get process outgoing dependency", exception.getCause());
        }
    }

    private Map<String, Tap> makeTapMap(Object resource) {
        Collection paths = this.makeCollection(resource);
        HashMap<String, Tap> taps = new HashMap<String, Tap>();
        for (Object path : paths) {
            if (path instanceof Tap && ((Tap)path).getIdentifier() != null) {
                taps.put(((Tap)path).getIdentifier(), (Tap)path);
                continue;
            }
            taps.put(path.toString(), new ProcessTap(new NullScheme(), path.toString()));
        }
        return taps;
    }

    private Collection makeCollection(Object resource) {
        if (resource instanceof Collection) {
            return (Collection)resource;
        }
        if (resource instanceof Object[]) {
            return Arrays.asList((Object[])resource);
        }
        return Arrays.asList(resource);
    }

    private Map<String, Tap> createTraps(ProcessWrapper processParent) {
        return new HashMap<String, Tap>();
    }

    @Override
    public String toString() {
        return this.getName() + ":" + this.process;
    }

    static class ProcessTap<Config>
    extends Tap<Config, Object, Object> {
        private final String token;

        ProcessTap(NullScheme scheme, String token) {
            super(scheme);
            this.token = token;
        }

        @Override
        public String getIdentifier() {
            return this.token;
        }

        @Override
        public String getFullIdentifier(Config conf) {
            return this.getIdentifier();
        }

        @Override
        public TupleEntryIterator openForRead(FlowProcess<? extends Config> flowProcess, Object input) throws IOException {
            return null;
        }

        @Override
        public TupleEntryCollector openForWrite(FlowProcess<? extends Config> flowProcess, Object output) throws IOException {
            return null;
        }

        @Override
        public boolean createResource(Config conf) throws IOException {
            return false;
        }

        @Override
        public boolean deleteResource(Config conf) throws IOException {
            return false;
        }

        @Override
        public boolean resourceExists(Config conf) throws IOException {
            return false;
        }

        @Override
        public long getModifiedTime(Config conf) throws IOException {
            return 0L;
        }

        @Override
        public String toString() {
            return this.token;
        }
    }

    static class NullScheme
    extends Scheme {
        NullScheme() {
        }

        public void sourceConfInit(FlowProcess flowProcess, Tap tap, Object conf) {
        }

        public void sinkConfInit(FlowProcess flowProcess, Tap tap, Object conf) {
        }

        public boolean source(FlowProcess flowProcess, SourceCall sourceCall) throws IOException {
            throw new UnsupportedOperationException("sourcing is not supported in the scheme");
        }

        @Override
        public String toString() {
            return this.getClass().getSimpleName();
        }

        public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException {
            throw new UnsupportedOperationException("sinking is not supported in the scheme");
        }
    }
}

