/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.client;

import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.call.ActorCreator;
import io.ray.streaming.client.JobClient;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.runtime.master.JobMaster;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobClientImpl
implements JobClient {
    public static final Logger LOG = LoggerFactory.getLogger(JobClientImpl.class);
    private ActorHandle<JobMaster> jobMasterActor;

    public void submit(JobGraph jobGraph, Map<String, String> jobConfig) {
        LOG.info("Submitting job [{}] with job graph [{}] and job config [{}].", new Object[]{jobGraph.getJobName(), jobGraph, jobConfig});
        HashMap resources = new HashMap();
        jobConfig.put("streaming.job.id", Ray.getRuntimeContext().getCurrentJobId().toString());
        jobConfig.put("streaming.job.name", jobGraph.getJobName());
        jobGraph.getJobConfig().putAll(jobConfig);
        this.jobMasterActor = ((ActorCreator)((ActorCreator)Ray.actor(JobMaster::new, jobConfig).setResources(resources)).setMaxRestarts(-1)).remote();
        try {
            ObjectRef submitResult = this.jobMasterActor.task(JobMaster::submitJob, this.jobMasterActor, (Object)jobGraph).remote();
            if (!((Boolean)submitResult.get()).booleanValue()) {
                throw new RuntimeException("submitting job failed");
            }
            LOG.info("Finish submitting job: {}.", (Object)jobGraph.getJobName());
        }
        catch (Exception e) {
            LOG.error("Failed to submit job: {}.", (Object)jobGraph.getJobName(), (Object)e);
            throw new RuntimeException("submitting job failed", e);
        }
    }
}

