package org.apache.iotdb.db.mpp.plan.scheduler;

import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.class */
public class AsyncPlanNodeSender {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AsyncPlanNodeSender.class);
    private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncInternalServiceClientManager;
    private final List<FragmentInstance> instances;
    private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap = new ConcurrentHashMap();
    private final AtomicLong pendingNumber;

    public AsyncPlanNodeSender(IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> iClientManager, List<FragmentInstance> list) {
        this.asyncInternalServiceClientManager = iClientManager;
        this.instances = list;
        this.pendingNumber = new AtomicLong(list.size());
    }

    public void sendAll() {
        long nanoTime = System.nanoTime();
        for (int i = 0; i < this.instances.size(); i++) {
            FragmentInstance fragmentInstance = this.instances.get(i);
            AsyncSendPlanNodeHandler asyncSendPlanNodeHandler = new AsyncSendPlanNodeHandler(i, this.pendingNumber, this.instanceId2RespMap, nanoTime);
            try {
                this.asyncInternalServiceClientManager.borrowClient(fragmentInstance.getHostDataNode().getInternalEndPoint()).sendPlanNode(new TSendPlanNodeReq(new TPlanNode(fragmentInstance.getFragment().getPlanNodeTree().serializeToByteBuffer()), fragmentInstance.getRegionReplicaSet().getRegionId()), asyncSendPlanNodeHandler);
            } catch (Exception e) {
                asyncSendPlanNodeHandler.onError(e);
            }
        }
    }

    public void waitUntilCompleted() throws InterruptedException {
        synchronized (this.pendingNumber) {
            while (this.pendingNumber.get() != 0) {
                this.pendingNumber.wait();
            }
        }
    }

    public List<TSStatus> getFailureStatusList() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, TSendPlanNodeResp> entry : this.instanceId2RespMap.entrySet()) {
            TSStatus status = entry.getValue().getStatus();
            if (entry.getValue().accepted) {
                if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    arrayList.add(status);
                }
            } else if (status == null) {
                logger.warn("dispatch write failed. message: {}, node {}", entry.getValue().message, this.instances.get(entry.getKey().intValue()).getHostDataNode().getInternalEndPoint());
                arrayList.add(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()));
            } else {
                logger.warn("dispatch write failed. status: {}, code: {}, message: {}, node {}", entry.getValue().status, TSStatusCode.representOf(status.code), entry.getValue().message, this.instances.get(entry.getKey().intValue()).getHostDataNode().getInternalEndPoint());
                arrayList.add(status);
            }
        }
        return arrayList;
    }

    public Future<FragInstanceDispatchResult> getResult() {
        for (Map.Entry<Integer, TSendPlanNodeResp> entry : this.instanceId2RespMap.entrySet()) {
            if (!entry.getValue().accepted) {
                logger.warn("dispatch write failed. status: {}, code: {}, message: {}, node {}", entry.getValue().status, TSStatusCode.representOf(entry.getValue().status.code), entry.getValue().message, this.instances.get(entry.getKey().intValue()).getHostDataNode().getInternalEndPoint());
                return entry.getValue().getStatus() == null ? Futures.immediateFuture(new FragInstanceDispatchResult(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage()))) : Futures.immediateFuture(new FragInstanceDispatchResult(entry.getValue().getStatus()));
            }
        }
        return Futures.immediateFuture(new FragInstanceDispatchResult(true));
    }
}
