/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.runtime.rpc;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.PyActorHandle;
import io.ray.api.Ray;
import io.ray.api.function.PyActorMethod;
import io.ray.api.function.RayFunc3;
import io.ray.streaming.runtime.generated.RemoteCall;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteCallWorker {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteCallWorker.class);

    public static ObjectRef<Boolean> initWorker(BaseActorHandle actor, JobWorkerContext context) {
        LOG.info("Call worker to initiate, actor: {}, context: {}.", (Object)actor.getId(), (Object)context);
        ObjectRef result = actor instanceof PyActorHandle ? ((PyActorHandle)actor).task(PyActorMethod.of((String)"init", Boolean.class), (Object)context.getPythonWorkerContextBytes()).remote() : ((ActorHandle)actor).task(JobWorker::init, (Object)context).remote();
        LOG.info("Finished calling worker to initiate.");
        return result;
    }

    public static ObjectRef rollback(BaseActorHandle actor, Long checkpointId) {
        ObjectRef result;
        LOG.info("Call worker to start, actor: {}.", (Object)actor.getId());
        if (actor instanceof PyActorHandle) {
            RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder().setCheckpointId(checkpointId).build();
            result = ((PyActorHandle)actor).task(PyActorMethod.of((String)"rollback"), (Object)checkpointIdPb.toByteArray()).remote();
        } else {
            result = ((ActorHandle)actor).task(JobWorker::rollback, (Object)checkpointId, (Object)System.currentTimeMillis()).remote();
        }
        LOG.info("Finished calling worker to start.");
        return result;
    }

    public static Boolean shutdownWithoutReconstruction(BaseActorHandle actor) {
        LOG.info("Call worker to shutdown without reconstruction, actor is {}.", (Object)actor.getId());
        Boolean result = false;
        LOG.info("Finished calling wk shutdownWithoutReconstruction, result is {}.", (Object)result);
        return result;
    }

    public static ObjectRef triggerCheckpoint(BaseActorHandle actor, Long barrierId) {
        if (actor instanceof PyActorHandle) {
            RemoteCall.Barrier barrierPb = RemoteCall.Barrier.newBuilder().setId(barrierId).build();
            return ((PyActorHandle)actor).task(PyActorMethod.of((String)"commit"), (Object)barrierPb.toByteArray()).remote();
        }
        return ((ActorHandle)actor).task(JobWorker::triggerCheckpoint, (Object)barrierId).remote();
    }

    public static void clearExpiredCheckpointParallel(List<BaseActorHandle> actors, Long stateCheckpointId, Long queueCheckpointId) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Call worker clearExpiredCheckpoint, state checkpoint id is {}, queue checkpoint id is {}.", (Object)stateCheckpointId, (Object)queueCheckpointId);
        }
        List<Object> result = RemoteCallWorker.checkpointCompleteCommonCallTwoWay(actors, stateCheckpointId, queueCheckpointId, "clear_expired_cp", (RayFunc3<JobWorker, Long, Long, Boolean>)((RayFunc3 & Serializable)JobWorker::clearExpiredCheckpoint));
        if (LOG.isInfoEnabled()) {
            result.forEach(obj -> LOG.info("Finish call worker clearExpiredCheckpointParallel, ret is {}.", obj));
        }
    }

    public static void notifyCheckpointTimeoutParallel(List<BaseActorHandle> actors, Long checkpointId) {
        LOG.info("Call worker notifyCheckpointTimeoutParallel, checkpoint id is {}", (Object)checkpointId);
        actors.forEach(actor -> {
            if (actor instanceof PyActorHandle) {
                RemoteCall.CheckpointId checkpointIdPb = RemoteCall.CheckpointId.newBuilder().setCheckpointId(checkpointId).build();
                ((PyActorHandle)actor).task(PyActorMethod.of((String)"notify_checkpoint_timeout"), (Object)checkpointIdPb.toByteArray()).remote();
            } else {
                ((ActorHandle)actor).task(JobWorker::notifyCheckpointTimeout, (Object)checkpointId).remote();
            }
        });
        LOG.info("Finish call worker notifyCheckpointTimeoutParallel.");
    }

    private static List<Object> checkpointCompleteCommonCallTwoWay(List<BaseActorHandle> actors, Long stateCheckpointId, Long queueCheckpointId, String pyFuncName, RayFunc3<JobWorker, Long, Long, Boolean> rayFunc) {
        List<ObjectRef<Object>> waitFor = RemoteCallWorker.checkpointCompleteCommonCall(actors, stateCheckpointId, queueCheckpointId, pyFuncName, rayFunc);
        return Ray.get(waitFor);
    }

    private static List<ObjectRef<Object>> checkpointCompleteCommonCall(List<BaseActorHandle> actors, Long stateCheckpointId, Long queueCheckpointId, String pyFuncName, RayFunc3<JobWorker, Long, Long, Boolean> rayFunc) {
        ArrayList<ObjectRef<Object>> waitFor = new ArrayList<ObjectRef<Object>>();
        actors.forEach(actor -> {
            if (actor instanceof PyActorHandle) {
                RemoteCall.CheckpointId stateCheckpointIdPb = RemoteCall.CheckpointId.newBuilder().setCheckpointId(stateCheckpointId).build();
                RemoteCall.CheckpointId queueCheckpointIdPb = RemoteCall.CheckpointId.newBuilder().setCheckpointId(queueCheckpointId).build();
                waitFor.add(((PyActorHandle)actor).task(PyActorMethod.of((String)pyFuncName), (Object)stateCheckpointIdPb.toByteArray(), (Object)queueCheckpointIdPb.toByteArray()).remote());
            } else {
                waitFor.add(((ActorHandle)actor).task(rayFunc, (Object)stateCheckpointId, (Object)queueCheckpointId).remote());
            }
        });
        return waitFor;
    }
}

