package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/operators/sort/ReadingThread.class */
public final class ReadingThread<E> extends ThreadBase<E> {
    private final MutableObjectIterator<E> reader;
    private final E readTarget;
    private final SorterInputGateway<E> sorterGateway;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReadingThread(@Nullable ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> mutableObjectIterator, StageRunner.StageMessageDispatcher<E> stageMessageDispatcher, @Nullable LargeRecordHandler<E> largeRecordHandler, @Nullable E e, long j) {
        super(exceptionHandler, "SortMerger Reading Thread", stageMessageDispatcher);
        this.sorterGateway = new SorterInputGateway<>(stageMessageDispatcher, largeRecordHandler, j);
        this.reader = (MutableObjectIterator) Preconditions.checkNotNull(mutableObjectIterator);
        this.readTarget = e;
    }

    @Override // org.apache.flink.runtime.operators.sort.ThreadBase
    public void go() throws IOException, InterruptedException {
        MutableObjectIterator<E> mutableObjectIterator = this.reader;
        E next = mutableObjectIterator.next(this.readTarget);
        while (true) {
            E e = next;
            if (!isRunning() || e == null) {
                break;
            }
            this.sorterGateway.writeRecord(e);
            next = mutableObjectIterator.next(e);
        }
        this.sorterGateway.finishReading();
    }
}
