/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.rpc.async;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.PyActorHandle;
import io.ray.api.function.PyActorMethod;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.message.CallResult;
import io.ray.streaming.runtime.rpc.PbResultParser;
import io.ray.streaming.runtime.rpc.async.RemoteCallPool;
import io.ray.streaming.runtime.transfer.channel.ChannelRecoverInfo;
import io.ray.streaming.runtime.worker.JobWorker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncRemoteCaller {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncRemoteCaller.class);
    private RemoteCallPool remoteCallPool = new RemoteCallPool();

    public void checkIfNeedRollbackAsync(BaseActorHandle actor, RemoteCallPool.Callback<Boolean> callback, RemoteCallPool.ExceptionHandler<Throwable> onException) {
        if (actor instanceof PyActorHandle) {
            this.remoteCallPool.bindCallback(((PyActorHandle)actor).task(PyActorMethod.of((String)"check_if_need_rollback")).remote(), obj -> {
                byte[] res = (byte[])obj;
                callback.handle(PbResultParser.parseBoolResult(res));
            }, onException);
        } else {
            this.remoteCallPool.bindCallback(((ActorHandle)actor).task(JobWorker::checkIfNeedRollback, (Object)System.currentTimeMillis()).remote(), callback, onException);
        }
    }

    public void rollback(BaseActorHandle actor, Long checkpointId, RemoteCallPool.Callback<CallResult<ChannelRecoverInfo>> callback, RemoteCallPool.ExceptionHandler<Throwable> onException) {
        if (actor instanceof PyActorHandle) {
            RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder().setCheckpointId(checkpointId).build();
            ObjectRef call = ((PyActorHandle)actor).task(PyActorMethod.of((String)"rollback"), (Object)checkpointIdPb.toByteArray()).remote();
            this.remoteCallPool.bindCallback(call, obj -> callback.handle(PbResultParser.parseRollbackResult((byte[])obj)), onException);
        } else {
            ObjectRef call = ((ActorHandle)actor).task(JobWorker::rollback, (Object)checkpointId, (Object)System.currentTimeMillis()).remote();
            this.remoteCallPool.bindCallback(call, obj -> {
                CallResult res = (CallResult)obj;
                callback.handle(res);
            }, onException);
        }
    }

    public void batchRollback(List<BaseActorHandle> actors, Long checkpointId, Collection<String> abnormalQueues, RemoteCallPool.Callback<List<CallResult<ChannelRecoverInfo>>> callback, RemoteCallPool.ExceptionHandler<Throwable> onException) {
        ArrayList<ObjectRef<Object>> rayCallList = new ArrayList<ObjectRef<Object>>();
        HashMap<Integer, Boolean> isPyActor = new HashMap<Integer, Boolean>();
        for (int i = 0; i < actors.size(); ++i) {
            ObjectRef call;
            BaseActorHandle actor = actors.get(i);
            if (actor instanceof PyActorHandle) {
                isPyActor.put(i, true);
                RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder().setCheckpointId(checkpointId).build();
                call = ((PyActorHandle)actor).task(PyActorMethod.of((String)"rollback"), (Object)checkpointIdPb.toByteArray()).remote();
            } else {
                call = ((ActorHandle)actor).task(JobWorker::rollback, (Object)checkpointId, (Object)System.currentTimeMillis()).remote();
            }
            rayCallList.add((ObjectRef<Object>)call);
        }
        this.remoteCallPool.bindCallback(rayCallList, objList -> {
            ArrayList<CallResult> results = new ArrayList<CallResult>();
            for (int i = 0; i < objList.size(); ++i) {
                Object obj = objList.get(i);
                if (isPyActor.getOrDefault(i, false).booleanValue()) {
                    results.add(PbResultParser.parseRollbackResult((byte[])obj));
                    continue;
                }
                results.add((CallResult)obj);
            }
            callback.handle(results);
        }, onException);
    }
}

