package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.class */
public class ChannelStateWriterImpl implements ChannelStateWriter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ChannelStateWriterImpl.class);
    private static final int DEFAULT_MAX_CHECKPOINTS = 1000;
    private final String taskName;
    private final ChannelStateWriteRequestExecutor executor;
    private final ConcurrentMap<Long, ChannelStateWriter.ChannelStateWriteResult> results;
    private final int maxCheckpoints;

    public ChannelStateWriterImpl(String str, int i, CheckpointStorageWorkerView checkpointStorageWorkerView) {
        this(str, i, checkpointStorageWorkerView, 1000);
    }

    ChannelStateWriterImpl(String str, int i, CheckpointStorageWorkerView checkpointStorageWorkerView, int i2) {
        this(str, new ConcurrentHashMap(i2), new ChannelStateWriteRequestExecutorImpl(str, new ChannelStateWriteRequestDispatcherImpl(i, checkpointStorageWorkerView, new ChannelStateSerializerImpl())), i2);
    }

    ChannelStateWriterImpl(String str, ConcurrentMap<Long, ChannelStateWriter.ChannelStateWriteResult> concurrentMap, ChannelStateWriteRequestExecutor channelStateWriteRequestExecutor, int i) {
        this.taskName = str;
        this.results = concurrentMap;
        this.maxCheckpoints = i;
        this.executor = channelStateWriteRequestExecutor;
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public void start(long j, CheckpointOptions checkpointOptions) {
        LOG.debug("{} starting checkpoint {} ({})", this.taskName, Long.valueOf(j), checkpointOptions);
        ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult = new ChannelStateWriter.ChannelStateWriteResult();
        Preconditions.checkArgument(this.results.computeIfAbsent(Long.valueOf(j), l -> {
            Preconditions.checkState(this.results.size() < this.maxCheckpoints, String.format("%s can't start %d, results.size() > maxCheckpoints: %d > %d", this.taskName, Long.valueOf(j), Integer.valueOf(this.results.size()), Integer.valueOf(this.maxCheckpoints)));
            enqueue(new CheckpointStartRequest(j, channelStateWriteResult, checkpointOptions.getTargetLocation()), false);
            return channelStateWriteResult;
        }) == channelStateWriteResult, this.taskName + " result future already present for checkpoint " + j);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public void addInputData(long j, InputChannelInfo inputChannelInfo, int i, CloseableIterator<Buffer> closeableIterator) {
        LOG.trace("{} adding input data, checkpoint {}, channel: {}, startSeqNum: {}", this.taskName, Long.valueOf(j), inputChannelInfo, Integer.valueOf(i));
        enqueue(ChannelStateWriteRequest.write(j, inputChannelInfo, closeableIterator), false);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public void addOutputData(long j, ResultSubpartitionInfo resultSubpartitionInfo, int i, Buffer... bufferArr) {
        Logger logger = LOG;
        Object[] objArr = new Object[5];
        objArr[0] = this.taskName;
        objArr[1] = Long.valueOf(j);
        objArr[2] = resultSubpartitionInfo;
        objArr[3] = Integer.valueOf(i);
        objArr[4] = Integer.valueOf(bufferArr == null ? 0 : bufferArr.length);
        logger.trace("{} adding output data, checkpoint {}, channel: {}, startSeqNum: {}, num buffers: {}", objArr);
        enqueue(ChannelStateWriteRequest.write(j, resultSubpartitionInfo, bufferArr), false);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public void finishInput(long j) {
        LOG.debug("{} finishing input data, checkpoint {}", this.taskName, Long.valueOf(j));
        enqueue(ChannelStateWriteRequest.completeInput(j), false);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public void finishOutput(long j) {
        LOG.debug("{} finishing output data, checkpoint {}", this.taskName, Long.valueOf(j));
        enqueue(ChannelStateWriteRequest.completeOutput(j), false);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public void abort(long j, Throwable th, boolean z) {
        LOG.debug("{} aborting, checkpoint {}", this.taskName, Long.valueOf(j));
        enqueue(ChannelStateWriteRequest.abort(j, th), true);
        enqueue(ChannelStateWriteRequest.abort(j, th), false);
        if (z) {
            this.results.remove(Long.valueOf(j));
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter
    public ChannelStateWriter.ChannelStateWriteResult getAndRemoveWriteResult(long j) {
        LOG.debug("{} requested write result, checkpoint {}", this.taskName, Long.valueOf(j));
        ChannelStateWriter.ChannelStateWriteResult remove = this.results.remove(Long.valueOf(j));
        Preconditions.checkArgument(remove != null, this.taskName + " channel state write result not found for checkpoint " + j);
        return remove;
    }

    public void open() {
        this.executor.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.debug("close, dropping checkpoints {}", this.results.keySet());
        this.results.clear();
        this.executor.close();
    }

    private void enqueue(ChannelStateWriteRequest channelStateWriteRequest, boolean z) {
        try {
            if (z) {
                this.executor.submitPriority(channelStateWriteRequest);
            } else {
                this.executor.submit(channelStateWriteRequest);
            }
        } catch (Exception e) {
            RuntimeException runtimeException = new RuntimeException("unable to send request to worker", e);
            try {
                channelStateWriteRequest.cancel(e);
            } catch (Exception e2) {
                runtimeException.addSuppressed(e2);
            }
            throw runtimeException;
        }
    }
}
