package org.apache.flink.streaming.runtime.io;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CachedBufferStorage.class */
public class CachedBufferStorage implements BufferStorage {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CachedBufferStorage.class);
    private final long maxBufferedBytes;
    private final String taskName;
    private final ArrayDeque<BufferOrEventSequence> rolledOverBuffersQueue;
    private BufferOrEventSequence rolledOverBuffers;
    private final int pageSize;
    private long rolledBytes;
    private long bytesBlocked;
    private ArrayDeque<BufferOrEvent> cachedBuffers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CachedBufferStorage$BufferOrEventSequence.class */
    public static class BufferOrEventSequence {
        private final ArrayDeque<BufferOrEvent> queuedBuffers;
        private final long size;

        BufferOrEventSequence(ArrayDeque<BufferOrEvent> arrayDeque, long j) {
            this.queuedBuffers = arrayDeque;
            this.size = j;
        }

        @Nullable
        public BufferOrEvent getNext() {
            return this.queuedBuffers.poll();
        }

        public void cleanup() {
            while (true) {
                BufferOrEvent poll = this.queuedBuffers.poll();
                if (poll == null) {
                    return;
                }
                if (poll.isBuffer()) {
                    poll.getBuffer().recycleBuffer();
                }
            }
        }

        public long size() {
            return this.size;
        }
    }

    public CachedBufferStorage(int i) {
        this(i, -1L, "Unknown");
    }

    public CachedBufferStorage(int i, long j, String str) {
        this.rolledOverBuffersQueue = new ArrayDeque<>();
        Preconditions.checkArgument(j == -1 || j > 0);
        this.maxBufferedBytes = j;
        this.taskName = str;
        this.pageSize = i;
        this.cachedBuffers = new ArrayDeque<>();
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public void add(BufferOrEvent bufferOrEvent) {
        this.bytesBlocked += this.pageSize;
        this.cachedBuffers.add(bufferOrEvent);
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage, java.lang.AutoCloseable
    public void close() {
        while (true) {
            BufferOrEvent poll = this.cachedBuffers.poll();
            if (poll == null) {
                break;
            } else if (poll.isBuffer()) {
                poll.getBuffer().recycleBuffer();
            }
        }
        if (this.rolledOverBuffers != null) {
            this.rolledOverBuffers.cleanup();
        }
        Iterator<BufferOrEventSequence> it2 = this.rolledOverBuffersQueue.iterator();
        while (it2.hasNext()) {
            it2.next().cleanup();
        }
        this.rolledOverBuffersQueue.clear();
        this.rolledBytes = 0L;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public long getPendingBytes() {
        return this.bytesBlocked;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public boolean isFull() {
        return this.maxBufferedBytes > 0 && getRolledBytes() + getPendingBytes() > this.maxBufferedBytes;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public void rollOver() {
        if (this.rolledOverBuffers == null) {
            this.rolledOverBuffers = rollOverCachedBuffers();
        } else {
            LOG.debug("{}: Checkpoint skipped via buffered data:Pushing back current alignment buffers and feeding back new alignment data first.", this.taskName);
            BufferOrEventSequence rollOverCachedBuffers = rollOverCachedBuffers();
            if (rollOverCachedBuffers != null) {
                this.rolledOverBuffersQueue.addFirst(this.rolledOverBuffers);
                this.rolledBytes += this.rolledOverBuffers.size();
                this.rolledOverBuffers = rollOverCachedBuffers;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Size of buffered data: {} bytes", this.taskName, Long.valueOf(this.rolledOverBuffers == null ? 0L : this.rolledOverBuffers.size()));
        }
    }

    private BufferOrEventSequence rollOverCachedBuffers() {
        if (this.bytesBlocked == 0) {
            return null;
        }
        BufferOrEventSequence bufferOrEventSequence = new BufferOrEventSequence(this.cachedBuffers, this.bytesBlocked);
        this.cachedBuffers = new ArrayDeque<>();
        this.bytesBlocked = 0L;
        return bufferOrEventSequence;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public long getRolledBytes() {
        return this.rolledBytes;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public boolean isEmpty() {
        return this.rolledOverBuffers == null;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public Optional<BufferOrEvent> pollNext() {
        if (this.rolledOverBuffers == null) {
            return Optional.empty();
        }
        Optional<BufferOrEvent> ofNullable = Optional.ofNullable(this.rolledOverBuffers.getNext());
        if (!ofNullable.isPresent()) {
            completeBufferedSequence();
        }
        return ofNullable;
    }

    private void completeBufferedSequence() {
        LOG.debug("{}: Finished feeding back buffered data.", this.taskName);
        this.rolledOverBuffers.cleanup();
        this.rolledOverBuffers = this.rolledOverBuffersQueue.pollFirst();
        if (this.rolledOverBuffers != null) {
            this.rolledBytes -= this.rolledOverBuffers.size();
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public long getMaxBufferedBytes() {
        return this.maxBufferedBytes;
    }
}
