package org.apache.flink.state.benchmark;

import java.util.Iterator;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.state.benchmark.RescalingBenchmark;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/state/benchmark/RescalingBenchmarkTest.class */
public class RescalingBenchmarkTest extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/benchmark/RescalingBenchmarkTest$IntegerRecordGenerator.class */
    public static class IntegerRecordGenerator implements RescalingBenchmark.StreamRecordGenerator<Integer> {
        private final int numberOfKeys = 1000;
        private int count;

        private IntegerRecordGenerator() {
            this.numberOfKeys = 1000;
            this.count = 0;
        }

        @Override // org.apache.flink.state.benchmark.RescalingBenchmark.StreamRecordGenerator
        public Iterator<StreamRecord<Integer>> generate() {
            return new Iterator<StreamRecord<Integer>>() { // from class: org.apache.flink.state.benchmark.RescalingBenchmarkTest.IntegerRecordGenerator.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return IntegerRecordGenerator.this.count < 1000;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public StreamRecord<Integer> next() {
                    IntegerRecordGenerator.this.count++;
                    return new StreamRecord<>(Integer.valueOf(ThreadLocalRandom.current().nextInt()), 0L);
                }
            };
        }

        @Override // org.apache.flink.state.benchmark.RescalingBenchmark.StreamRecordGenerator
        public TypeInformation<Integer> getTypeInformation() {
            return BasicTypeInfo.INT_TYPE_INFO;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/benchmark/RescalingBenchmarkTest$TestKeyedFunction.class */
    public static class TestKeyedFunction extends KeyedProcessFunction<Integer, Integer, Void> {
        private static final long serialVersionUID = 1;
        private ValueState<Integer> randomState;

        private TestKeyedFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            this.randomState = getRuntimeContext().getState(new ValueStateDescriptor("RandomState", Integer.class));
        }

        public void processElement(Integer num, KeyedProcessFunction<Integer, Integer, Void>.Context context, Collector<Void> collector) throws Exception {
            this.randomState.update(Integer.valueOf(ThreadLocalRandom.current().nextInt()));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (KeyedProcessFunction<Integer, Integer, Void>.Context) context, (Collector<Void>) collector);
        }
    }

    @Test
    public void testScalingOut() throws Exception {
        RescalingBenchmark build = new RescalingBenchmarkBuilder().setMaxParallelism(128).setParallelismBefore(1).setParallelismAfter(2).setManagedMemorySize(536870912).setCheckpointStorageAccess(new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI()).createCheckpointStorage(new JobID())).setStateBackend(new EmbeddedRocksDBStateBackend(true)).setStreamRecordGenerator(new IntegerRecordGenerator()).setStateProcessFunctionSupplier(() -> {
            return new TestKeyedFunction();
        }).build();
        build.setUp();
        build.prepareStateForOperator(0);
        build.rescale();
        build.closeOperator();
        build.tearDown();
    }

    @Test
    public void testScalingIn() throws Exception {
        RescalingBenchmark build = new RescalingBenchmarkBuilder().setMaxParallelism(128).setParallelismBefore(2).setParallelismAfter(1).setManagedMemorySize(536870912).setCheckpointStorageAccess(new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI()).createCheckpointStorage(new JobID())).setStateBackend(new EmbeddedRocksDBStateBackend(true)).setStreamRecordGenerator(new IntegerRecordGenerator()).setStateProcessFunctionSupplier(() -> {
            return new TestKeyedFunction();
        }).build();
        build.setUp();
        build.prepareStateForOperator(0);
        build.rescale();
        build.closeOperator();
        build.tearDown();
    }
}
