package org.apache.tez.runtime.library.output;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.common.EnvironmentUpdateUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/tez/runtime/library/output/TestOnFileSortedOutput.class */
public class TestOnFileSortedOutput {
    private static final Random rnd = new Random();
    private static final String UniqueID = "UUID";
    private static final String HOST = "localhost";
    private static final int PORT = 80;
    private FileSystem fs;
    private int partitions;
    private int sorterThreads;
    private KeyValuesWriter writer;
    private OrderedPartitionedKVOutput sortedOutput;
    private boolean sendEmptyPartitionViaEvent;
    private int emptyPartitionIdx;
    private Configuration conf = new Configuration();
    private Path workingDir = new Path(".", getClass().getName());

    public TestOnFileSortedOutput(boolean z, int i, int i2) throws IOException {
        this.sendEmptyPartitionViaEvent = z;
        this.emptyPartitionIdx = i2;
        this.sorterThreads = i;
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{this.workingDir.toString()});
        this.fs = FileSystem.getLocal(this.conf);
    }

    @Before
    public void setup() throws Exception {
        this.conf.setInt("tez.runtime.sort.threads", this.sorterThreads);
        this.conf.setInt("tez.runtime.io.sort.mb", 5);
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setBoolean("tez.runtime.empty.partitions.info-via-events.enabled", this.sendEmptyPartitionViaEvent);
        EnvironmentUpdateUtils.put(ApplicationConstants.Environment.NM_HOST.toString(), "localhost");
        this.fs.mkdirs(this.workingDir);
        this.partitions = Math.max(1, rnd.nextInt(10));
    }

    @After
    public void cleanup() throws IOException {
        this.fs.delete(this.workingDir, true);
    }

    @Parameterized.Parameters(name = "test[{0}, {1}, {2}]")
    public static Collection<Object[]> getParameters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{false, 1, -1});
        arrayList.add(new Object[]{false, 1, 0});
        arrayList.add(new Object[]{true, 1, -1});
        arrayList.add(new Object[]{true, 1, 0});
        arrayList.add(new Object[]{false, 2, -1});
        arrayList.add(new Object[]{false, 2, 0});
        arrayList.add(new Object[]{true, 2, -1});
        arrayList.add(new Object[]{true, 2, 0});
        return arrayList;
    }

    private void startSortedOutput(int i) throws Exception {
        this.sortedOutput = new OrderedPartitionedKVOutput(createTezOutputContext(), i);
        this.sortedOutput.initialize();
        this.sortedOutput.start();
        this.writer = this.sortedOutput.getWriter();
    }

    @Test(timeout = 5000)
    public void testSortBufferSize() throws Exception {
        OutputContext createTezOutputContext = createTezOutputContext();
        this.conf.setInt("tez.runtime.io.sort.mb", 2048);
        ((OutputContext) Mockito.doReturn(TezUtils.createUserPayloadFromConf(this.conf)).when(createTezOutputContext)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(createTezOutputContext, this.partitions);
        try {
            this.sortedOutput.initialize();
            Assert.fail();
        } catch (IllegalArgumentException e) {
            Assert.assertTrue(e.getMessage().contains("tez.runtime.io.sort.mb"));
        }
        this.conf.setInt("tez.runtime.io.sort.mb", 0);
        ((OutputContext) Mockito.doReturn(TezUtils.createUserPayloadFromConf(this.conf)).when(createTezOutputContext)).getUserPayload();
        this.sortedOutput = new OrderedPartitionedKVOutput(createTezOutputContext, this.partitions);
        try {
            this.sortedOutput.initialize();
            Assert.fail();
        } catch (IllegalArgumentException e2) {
            Assert.assertTrue(e2.getMessage().contains("tez.runtime.io.sort.mb"));
        }
    }

    @Test(timeout = 5000)
    public void baseTest() throws Exception {
        startSortedOutput(this.partitions);
        for (int i = 0; i < Math.max(1, rnd.nextInt(50)); i++) {
            Text text = new Text(new BigInteger(256, rnd).toString());
            LinkedList linkedList = new LinkedList();
            for (int i2 = 0; i2 < Math.max(2, rnd.nextInt(10)); i2++) {
                linkedList.add(new Text(new BigInteger(256, rnd).toString()));
            }
            this.writer.write(text, linkedList);
        }
        List close = this.sortedOutput.close();
        Assert.assertTrue(close != null && close.size() == 2);
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) close.get(1)).getUserPayload()));
        Assert.assertEquals("localhost", parseFrom.getHost());
        Assert.assertEquals(80L, parseFrom.getPort());
        Assert.assertEquals(UniqueID, parseFrom.getPathComponent());
    }

    @Test(timeout = 5000)
    public void testWithSomeEmptyPartition() throws Exception {
        this.partitions = Math.max(2, this.partitions);
        startSortedOutput(this.partitions);
        for (int i = 0; i < 2 * this.partitions; i++) {
            Text text = new Text(new BigInteger(256, rnd).toString());
            Text text2 = new Text(new BigInteger(256, rnd).toString());
            if (i % this.partitions != this.emptyPartitionIdx) {
                this.writer.write(text, text2);
            }
        }
        List close = this.sortedOutput.close();
        Assert.assertTrue(close != null && close.size() == 2);
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) close.get(1)).getUserPayload()));
        Assert.assertEquals("localhost", parseFrom.getHost());
        Assert.assertEquals(80L, parseFrom.getPort());
        Assert.assertEquals(UniqueID, parseFrom.getPathComponent());
    }

    @Test(timeout = 5000)
    public void testAllEmptyPartition() throws Exception {
        startSortedOutput(this.partitions);
        List close = this.sortedOutput.close();
        Assert.assertTrue(close != null && close.size() == 2);
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) close.get(1)).getUserPayload()));
        if (this.sendEmptyPartitionViaEvent) {
            Assert.assertEquals("", parseFrom.getHost());
            Assert.assertEquals(0L, parseFrom.getPort());
            Assert.assertEquals("", parseFrom.getPathComponent());
        } else {
            Assert.assertEquals("localhost", parseFrom.getHost());
            Assert.assertEquals(80L, parseFrom.getPort());
            Assert.assertEquals(UniqueID, parseFrom.getPathComponent());
        }
    }

    private OutputContext createTezOutputContext() throws IOException {
        String[] strArr = {this.workingDir.toString()};
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(this.conf);
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(PORT);
        TezCounters tezCounters = new TezCounters();
        OutputContext outputContext = (OutputContext) Mockito.mock(OutputContext.class);
        ((OutputContext) Mockito.doReturn(tezCounters).when(outputContext)).getCounters();
        ((OutputContext) Mockito.doReturn(strArr).when(outputContext)).getWorkDirs();
        ((OutputContext) Mockito.doReturn(createUserPayloadFromConf).when(outputContext)).getUserPayload();
        ((OutputContext) Mockito.doReturn(5242880L).when(outputContext)).getTotalMemoryAvailableToTask();
        ((OutputContext) Mockito.doReturn(UniqueID).when(outputContext)).getUniqueIdentifier();
        ((OutputContext) Mockito.doReturn("v1").when(outputContext)).getDestinationVertexName();
        ((OutputContext) Mockito.doReturn(ByteBuffer.wrap(dataOutputBuffer.getData())).when(outputContext)).getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
        ((OutputContext) Mockito.doAnswer(new Answer() { // from class: org.apache.tez.runtime.library.output.TestOnFileSortedOutput.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((MemoryUpdateCallbackHandler) invocationOnMock.getArguments()[1]).memoryAssigned(((Long) invocationOnMock.getArguments()[0]).longValue());
                return null;
            }
        }).when(outputContext)).requestInitialMemory(Matchers.anyLong(), (MemoryUpdateCallback) Matchers.any(MemoryUpdateCallback.class));
        return outputContext;
    }
}
