/*
 * Decompiled with CFR 0.152.
 */
package net.risesoft.api.job.actions.dispatch.executor.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.risedata.rpc.provide.listener.SyncResult;
import net.risedata.rpc.provide.net.ClinetConnection;
import net.risesoft.api.exceptions.JobException;
import net.risesoft.api.job.JobContext;
import net.risesoft.api.job.actions.dispatch.ExecutorAction;
import net.risesoft.api.job.actions.dispatch.executor.DoBalance;
import net.risesoft.api.job.actions.dispatch.executor.Result;
import net.risesoft.api.job.actions.dispatch.executor.ResultError;
import net.risesoft.api.job.actions.dispatch.executor.ResultSuccess;
import net.risesoft.api.listener.ClientListener;
import net.risesoft.api.persistence.model.job.Job;
import net.risesoft.api.persistence.model.job.JobLog;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.stereotype.Component;

@Component(value="job")
public class JobExecutorAction
implements ExecutorAction {
    @Value(value="${beta.job.timeOut:120}")
    private Integer defaultTimeOut;

    @Override
    public Result action(Job job, JobLog jobLog, Map<String, Object> args, ServiceInstance iServiceInstance, JobContext jobContext, DoBalance doBalance) {
        ClinetConnection clinetConnection = ClientListener.getConnection(iServiceInstance.getInstanceId());
        if (clinetConnection == null) {
            throw new JobException(iServiceInstance.getInstanceId() + "\u8c03\u5ea6\u5931\u8d25\u672a\u627e\u5230\u8fde\u63a5!");
        }
        final SyncResult syncResult = clinetConnection.pushListener(job.getSource(), args, (long)((job.getSourceTimeOut() > 0 ? job.getSourceTimeOut() : this.defaultTimeOut) * 1000));
        return new Result(){
            private List<ResultSuccess> successes = new ArrayList<ResultSuccess>();
            private List<ResultError> errors = new ArrayList<ResultError>();
            private Object res;

            @Override
            public synchronized Result onSuccess(ResultSuccess success) {
                if (this.res != null) {
                    success.onSuccess(this.res);
                    return this;
                }
                1 lockObject = this;
                this.successes.add(success);
                syncResult.onSuccess(res -> {
                    Object object = lockObject;
                    synchronized (object) {
                        this.res = res;
                        for (int i = 0; i < this.successes.size(); ++i) {
                            if (res.getValue().size() == 1) {
                                this.successes.remove(i).onSuccess(res.getValue().get(0));
                                --i;
                                continue;
                            }
                            this.successes.remove(i).onSuccess(res.getValue());
                            --i;
                        }
                    }
                });
                return this;
            }

            @Override
            public Result onError(ResultError error) {
                this.errors.add(error);
                syncResult.onError((res, e) -> {
                    for (int i = 0; i < this.errors.size(); ++i) {
                        this.errors.remove(i).onError(e);
                        --i;
                    }
                });
                return this;
            }

            @Override
            public Object getValue() {
                return syncResult.getResult().getValue();
            }
        };
    }
}

