package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriter.class */
public abstract class RecordWriter<T extends IOReadableWritable> implements AvailabilityProvider {

    @VisibleForTesting
    public static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RecordWriter.class);
    protected final ResultPartitionWriter targetPartition;
    protected final int numberOfChannels;
    protected final boolean flushAlways;

    @Nullable
    private final RecordWriter<T>.OutputFlusher outputFlusher;
    private Throwable flusherException;
    private volatile Throwable volatileFlusherException;
    private int volatileFlusherExceptionCheckSkipCount;
    private static final int VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT = 100;
    protected final Random rng = new XORShiftRandom();
    protected final DataOutputSerializer serializer = new DataOutputSerializer(128);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriter$OutputFlusher.class */
    private class OutputFlusher extends Thread {
        private final long timeout;
        private volatile boolean running;

        OutputFlusher(String str, long j) {
            super(str);
            this.running = true;
            setDaemon(true);
            this.timeout = j;
        }

        public void terminate() {
            this.running = false;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    try {
                        Thread.sleep(this.timeout);
                    } catch (InterruptedException e) {
                        if (this.running) {
                            throw new Exception(e);
                        }
                    }
                    RecordWriter.this.flushAll();
                } catch (Throwable th) {
                    RecordWriter.this.notifyFlusherException(th);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriter(ResultPartitionWriter resultPartitionWriter, long j, String str) {
        this.targetPartition = resultPartitionWriter;
        this.numberOfChannels = resultPartitionWriter.getNumberOfSubpartitions();
        Preconditions.checkArgument(j >= -1);
        this.flushAlways = j == 0;
        if (j == -1 || j == 0) {
            this.outputFlusher = null;
        } else {
            this.outputFlusher = new OutputFlusher(str == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "OutputFlusher for " + str, j);
            this.outputFlusher.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emit(T t, int i) throws IOException {
        checkErroneous();
        this.targetPartition.emitRecord(serializeRecord(this.serializer, t), i);
        if (this.flushAlways) {
            this.targetPartition.flush(i);
        }
    }

    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException {
        broadcastEvent(abstractEvent, false);
    }

    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        this.targetPartition.broadcastEvent(abstractEvent, z);
        if (this.flushAlways) {
            flushAll();
        }
    }

    @VisibleForTesting
    public static ByteBuffer serializeRecord(DataOutputSerializer dataOutputSerializer, IOReadableWritable iOReadableWritable) throws IOException {
        dataOutputSerializer.setPositionUnsafe(4);
        iOReadableWritable.write(dataOutputSerializer);
        dataOutputSerializer.writeIntUnsafe(dataOutputSerializer.length() - 4, 0);
        return dataOutputSerializer.wrapAsByteBuffer();
    }

    public void flushAll() {
        this.targetPartition.flushAll();
    }

    public void setMetricGroup(TaskIOMetricGroup taskIOMetricGroup) {
        this.targetPartition.setMetricGroup(taskIOMetricGroup);
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return this.targetPartition.getAvailableFuture();
    }

    public abstract void emit(T t) throws IOException;

    public void randomEmit(T t) throws IOException {
        checkErroneous();
        emit(t, this.rng.nextInt(this.numberOfChannels));
    }

    public abstract void broadcastEmit(T t) throws IOException;

    public void close() {
        if (this.outputFlusher != null) {
            this.outputFlusher.terminate();
            try {
                this.outputFlusher.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFlusherException(Throwable th) {
        if (this.flusherException == null) {
            LOG.error("An exception happened while flushing the outputs", th);
            this.flusherException = th;
            this.volatileFlusherException = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkErroneous() throws IOException {
        if (this.flusherException != null || (this.volatileFlusherExceptionCheckSkipCount >= 100 && this.volatileFlusherException != null)) {
            throw new IOException("An exception happened while flushing the outputs", this.volatileFlusherException);
        }
        int i = this.volatileFlusherExceptionCheckSkipCount + 1;
        this.volatileFlusherExceptionCheckSkipCount = i;
        if (i >= 100) {
            this.volatileFlusherExceptionCheckSkipCount = 0;
        }
    }

    @VisibleForTesting
    ResultPartitionWriter getTargetPartition() {
        return this.targetPartition;
    }
}
