package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.class */
public class TestMergeManager {

    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager$StubbedMergeManager.class */
    private static class StubbedMergeManager extends MergeManager<Text, Text> {
        private TestMergeThread mergeThread;

        public StubbedMergeManager(JobConf jobConf, ExceptionReporter exceptionReporter, CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) {
            super(null, jobConf, (FileSystem) Mockito.mock(LocalFileSystem.class), null, null, null, null, null, null, null, null, exceptionReporter, null, (MapOutputFile) Mockito.mock(MapOutputFile.class));
            this.mergeThread.setSyncBarriers(cyclicBarrier, cyclicBarrier2);
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeManager
        protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
            this.mergeThread = new TestMergeThread(this, getExceptionReporter());
            return this.mergeThread;
        }

        public int getNumMerges() {
            return this.mergeThread.getNumMerges();
        }
    }

    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager$TestExceptionReporter.class */
    private static class TestExceptionReporter implements ExceptionReporter {
        private List<Throwable> exceptions;

        private TestExceptionReporter() {
            this.exceptions = new ArrayList();
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter
        public void reportException(Throwable th) {
            this.exceptions.add(th);
            th.printStackTrace();
        }

        public int getNumExceptions() {
            return this.exceptions.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager$TestMergeThread.class */
    public static class TestMergeThread extends MergeThread<MapOutput<Text, Text>, Text, Text> {
        private AtomicInteger numMerges;
        private CyclicBarrier mergeStart;
        private CyclicBarrier mergeComplete;

        public TestMergeThread(MergeManager<Text, Text> mergeManager, ExceptionReporter exceptionReporter) {
            super(mergeManager, Integer.MAX_VALUE, exceptionReporter);
            this.numMerges = new AtomicInteger(0);
        }

        public synchronized void setSyncBarriers(CyclicBarrier cyclicBarrier, CyclicBarrier cyclicBarrier2) {
            this.mergeStart = cyclicBarrier;
            this.mergeComplete = cyclicBarrier2;
        }

        public int getNumMerges() {
            return this.numMerges.get();
        }

        @Override // org.apache.hadoop.mapreduce.task.reduce.MergeThread
        public void merge(List<MapOutput<Text, Text>> list) throws IOException {
            synchronized (this) {
                this.numMerges.incrementAndGet();
                Iterator<MapOutput<Text, Text>> it = list.iterator();
                while (it.hasNext()) {
                    this.manager.unreserve(it.next().getSize());
                }
            }
            try {
                this.mergeStart.await();
                this.mergeComplete.await();
            } catch (InterruptedException e) {
            } catch (BrokenBarrierException e2) {
            }
        }
    }

    @Test(timeout = Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS)
    public void testMemoryMerge() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
        jobConf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS);
        jobConf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.8f);
        jobConf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 0.9f);
        TestExceptionReporter testExceptionReporter = new TestExceptionReporter();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
        StubbedMergeManager stubbedMergeManager = new StubbedMergeManager(jobConf, testExceptionReporter, cyclicBarrier, cyclicBarrier2);
        MapOutput<Text, Text> reserve = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertEquals("Should be a memory merge", MapOutput.Type.MEMORY, reserve.getType());
        fillOutput(reserve);
        MapOutput<Text, Text> reserve2 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertEquals("Should be a memory merge", MapOutput.Type.MEMORY, reserve2.getType());
        fillOutput(reserve2);
        Assert.assertEquals("Should be told to wait", MapOutput.Type.WAIT, stubbedMergeManager.reserve(null, 7950L, 0).getType());
        reserve.commit();
        reserve2.commit();
        cyclicBarrier.await();
        Assert.assertEquals(1L, stubbedMergeManager.getNumMerges());
        MapOutput<Text, Text> reserve3 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertEquals("Should be a memory merge", MapOutput.Type.MEMORY, reserve3.getType());
        fillOutput(reserve3);
        MapOutput<Text, Text> reserve4 = stubbedMergeManager.reserve(null, 7950L, 0);
        Assert.assertEquals("Should be a memory merge", MapOutput.Type.MEMORY, reserve4.getType());
        fillOutput(reserve4);
        Assert.assertEquals("Should be told to wait", MapOutput.Type.WAIT, stubbedMergeManager.reserve(null, 7950L, 0).getType());
        reserve3.commit();
        reserve4.commit();
        cyclicBarrier2.await();
        cyclicBarrier.await();
        Assert.assertEquals(2L, stubbedMergeManager.getNumMerges());
        cyclicBarrier2.await();
        Assert.assertEquals(2L, stubbedMergeManager.getNumMerges());
        Assert.assertEquals("exception reporter invoked", 0L, testExceptionReporter.getNumExceptions());
    }

    private void fillOutput(MapOutput<Text, Text> mapOutput) throws IOException {
        BoundedByteArrayOutputStream arrayStream = mapOutput.getArrayStream();
        int limit = arrayStream.getLimit();
        for (int i = 0; i < limit; i++) {
            arrayStream.write(i);
        }
    }
}
