package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.class */
public class TaskMailboxImpl implements TaskMailbox {
    private final ReentrantLock lock;

    @GuardedBy("lock")
    private final Deque<Mail> queue;

    @GuardedBy("lock")
    private final Condition notEmpty;

    @GuardedBy("lock")
    private TaskMailbox.State state;

    @Nonnull
    private final Thread taskMailboxThread;
    private final Deque<Mail> batch;
    private volatile boolean hasNewMail;

    public TaskMailboxImpl(@Nonnull Thread thread) {
        this.lock = new ReentrantLock();
        this.queue = new ArrayDeque();
        this.notEmpty = this.lock.newCondition();
        this.state = TaskMailbox.State.OPEN;
        this.batch = new ArrayDeque();
        this.hasNewMail = false;
        this.taskMailboxThread = thread;
    }

    @VisibleForTesting
    public TaskMailboxImpl() {
        this(Thread.currentThread());
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public boolean isMailboxThread() {
        return Thread.currentThread() == this.taskMailboxThread;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public boolean hasMail() {
        checkIsMailboxThread();
        return !this.batch.isEmpty() || this.hasNewMail;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public Optional<Mail> tryTake(int i) {
        checkIsMailboxThread();
        checkTakeStateConditions();
        Mail takeOrNull = takeOrNull(this.batch, i);
        if (takeOrNull != null) {
            return Optional.of(takeOrNull);
        }
        if (!this.hasNewMail) {
            return Optional.empty();
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            Mail takeOrNull2 = takeOrNull(this.queue, i);
            if (takeOrNull2 == null) {
                Optional<Mail> empty = Optional.empty();
                reentrantLock.unlock();
                return empty;
            }
            this.hasNewMail = !this.queue.isEmpty();
            Optional<Mail> ofNullable = Optional.ofNullable(takeOrNull2);
            reentrantLock.unlock();
            return ofNullable;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    @Nonnull
    public Mail take(int i) throws InterruptedException, IllegalStateException {
        Mail takeOrNull;
        checkIsMailboxThread();
        checkTakeStateConditions();
        Mail takeOrNull2 = takeOrNull(this.batch, i);
        if (takeOrNull2 != null) {
            return takeOrNull2;
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lockInterruptibly();
        while (true) {
            try {
                takeOrNull = takeOrNull(this.queue, i);
                if (takeOrNull != null) {
                    break;
                }
                this.notEmpty.await();
            } catch (Throwable th) {
                reentrantLock.unlock();
                throw th;
            }
        }
        this.hasNewMail = !this.queue.isEmpty();
        reentrantLock.unlock();
        return takeOrNull;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public boolean createBatch() {
        checkIsMailboxThread();
        if (!this.hasNewMail) {
            return !this.batch.isEmpty();
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        while (true) {
            try {
                Mail pollFirst = this.queue.pollFirst();
                if (pollFirst == null) {
                    break;
                }
                this.batch.addLast(pollFirst);
            } finally {
                reentrantLock.unlock();
            }
        }
        this.hasNewMail = false;
        return !this.batch.isEmpty();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public Optional<Mail> tryTakeFromBatch() {
        checkIsMailboxThread();
        checkTakeStateConditions();
        return Optional.ofNullable(this.batch.pollFirst());
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public void put(@Nonnull Mail mail) {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            checkPutStateConditions();
            this.queue.addLast(mail);
            this.hasNewMail = true;
            this.notEmpty.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public void putFirst(@Nonnull Mail mail) {
        if (isMailboxThread()) {
            checkPutStateConditions();
            this.batch.addFirst(mail);
            return;
        }
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            checkPutStateConditions();
            this.queue.addFirst(mail);
            this.hasNewMail = true;
            this.notEmpty.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    @Nullable
    private Mail takeOrNull(Deque<Mail> deque, int i) {
        if (deque.isEmpty()) {
            return null;
        }
        Iterator<Mail> it2 = deque.iterator();
        while (it2.hasNext()) {
            Mail next = it2.next();
            if (next.getPriority() >= i) {
                it2.remove();
                return next;
            }
        }
        return null;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public List<Mail> drain() {
        ArrayList arrayList = new ArrayList(this.batch);
        this.batch.clear();
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            arrayList.addAll(this.queue);
            this.queue.clear();
            this.hasNewMail = false;
            reentrantLock.unlock();
            return arrayList;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    private void checkIsMailboxThread() {
        if (!isMailboxThread()) {
            throw new IllegalStateException("Illegal thread detected. This method must be called from inside the mailbox thread!");
        }
    }

    private void checkPutStateConditions() {
        if (this.state != TaskMailbox.State.OPEN) {
            throw new IllegalStateException("Mailbox is in state " + this.state + ", but is required to be in state " + TaskMailbox.State.OPEN + " for put operations.");
        }
    }

    private void checkTakeStateConditions() {
        if (this.state == TaskMailbox.State.CLOSED) {
            throw new IllegalStateException("Mailbox is in state " + this.state + ", but is required to be in state " + TaskMailbox.State.OPEN + " or " + TaskMailbox.State.QUIESCED + " for take operations.");
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public void quiesce() {
        checkIsMailboxThread();
        this.lock.lock();
        try {
            if (this.state == TaskMailbox.State.OPEN) {
                this.state = TaskMailbox.State.QUIESCED;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    @Nonnull
    public List<Mail> close() {
        checkIsMailboxThread();
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (this.state == TaskMailbox.State.CLOSED) {
                List<Mail> emptyList = Collections.emptyList();
                reentrantLock.unlock();
                return emptyList;
            }
            List<Mail> drain = drain();
            this.state = TaskMailbox.State.CLOSED;
            this.notEmpty.signalAll();
            reentrantLock.unlock();
            return drain;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    @Nonnull
    public TaskMailbox.State getState() {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            return this.state;
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox
    public void runExclusively(Runnable runnable) {
        this.lock.lock();
        try {
            runnable.run();
        } finally {
            this.lock.unlock();
        }
    }
}
