package org.apache.flink.runtime.io.network.partition.consumer;

import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.types.Either;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.class */
public class RemoteChannelStateChecker {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RemoteChannelStateChecker.class);
    private final ResultPartitionID resultPartitionId;
    private final String taskNameWithSubtask;

    public RemoteChannelStateChecker(ResultPartitionID resultPartitionID, String str) {
        this.resultPartitionId = resultPartitionID;
        this.taskNameWithSubtask = str;
    }

    public boolean isProducerReadyOrAbortConsumption(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        Either<ExecutionState, Throwable> producerExecutionState = responseHandle.getProducerExecutionState();
        if (!isConsumerStateValidForConsumption(responseHandle.getConsumerExecutionState())) {
            LOG.debug("Ignore a partition producer state notification for task {}, because it's not running.", this.taskNameWithSubtask);
            return false;
        }
        if (!producerExecutionState.isLeft() && !(producerExecutionState.right() instanceof TimeoutException)) {
            handleFailedCheckResult(responseHandle);
            return false;
        }
        if (isProducerConsumerReady(responseHandle)) {
            return true;
        }
        abortConsumptionOrIgnoreCheckResult(responseHandle);
        return false;
    }

    private static boolean isConsumerStateValidForConsumption(ExecutionState executionState) {
        return executionState == ExecutionState.RUNNING || executionState == ExecutionState.INITIALIZING || executionState == ExecutionState.DEPLOYING;
    }

    private boolean isProducerConsumerReady(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        ExecutionState producerState = getProducerState(responseHandle);
        return producerState == ExecutionState.SCHEDULED || producerState == ExecutionState.DEPLOYING || producerState == ExecutionState.INITIALIZING || producerState == ExecutionState.RUNNING || producerState == ExecutionState.FINISHED;
    }

    private void abortConsumptionOrIgnoreCheckResult(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        ExecutionState producerState = getProducerState(responseHandle);
        if (producerState != ExecutionState.CANCELING && producerState != ExecutionState.CANCELED && producerState != ExecutionState.FAILED) {
            responseHandle.failConsumption(new IllegalStateException(String.format("Producer with attempt ID %s of partition %s in unexpected state %s.", this.resultPartitionId.getProducerId(), this.resultPartitionId.getPartitionId(), producerState)));
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.", this.taskNameWithSubtask, this.resultPartitionId.getPartitionId(), this.resultPartitionId.getProducerId(), producerState);
        }
        responseHandle.cancelConsumption();
    }

    private static ExecutionState getProducerState(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        Either<ExecutionState, Throwable> producerExecutionState = responseHandle.getProducerExecutionState();
        return producerExecutionState.isLeft() ? producerExecutionState.left() : ExecutionState.RUNNING;
    }

    private void handleFailedCheckResult(PartitionProducerStateProvider.ResponseHandle responseHandle) {
        Throwable right = responseHandle.getProducerExecutionState().right();
        if (!(right instanceof PartitionProducerDisposedException)) {
            responseHandle.failConsumption(right);
        } else {
            LOG.info(String.format("Producer %s of partition %s disposed. Cancelling execution.", this.resultPartitionId.getProducerId(), this.resultPartitionId.getPartitionId()), right);
            responseHandle.cancelConsumption();
        }
    }
}
