package org.apache.flink.connector.base.sink.writer.strategy;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/connector/base/sink/writer/strategy/CongestionControlRateLimitingStrategyTest.class */
class CongestionControlRateLimitingStrategyTest {
    CongestionControlRateLimitingStrategyTest() {
    }

    @Test
    void testMaxInFlightRequestsRespected() {
        CongestionControlRateLimitingStrategy build = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(2).setInitialMaxInFlightMessages(10).setScalingStrategy(AIMDScalingStrategy.builder(10).build()).build();
        BasicRequestInfo basicRequestInfo = new BasicRequestInfo(0);
        BasicResultInfo basicResultInfo = new BasicResultInfo(0, 0);
        for (int i = 0; i < 2; i++) {
            Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
            build.registerInFlightRequest(basicRequestInfo);
        }
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isTrue();
        build.registerCompletedRequest(basicResultInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
    }

    @Test
    void testMaxInFlightRequestsDoesNotGoBelowZero() {
        CongestionControlRateLimitingStrategy build = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(1).setInitialMaxInFlightMessages(10).setScalingStrategy(AIMDScalingStrategy.builder(10).build()).build();
        BasicRequestInfo basicRequestInfo = new BasicRequestInfo(0);
        BasicResultInfo basicResultInfo = new BasicResultInfo(0, 0);
        build.registerCompletedRequest(basicResultInfo);
        for (int i = 0; i < 1; i++) {
            Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
            build.registerInFlightRequest(basicRequestInfo);
        }
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isTrue();
        build.registerCompletedRequest(basicResultInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
    }

    @Test
    void testInitialMaxInFlightMessagesRespected() {
        BasicRequestInfo basicRequestInfo = new BasicRequestInfo(2);
        BasicResultInfo basicResultInfo = new BasicResultInfo(0, 2);
        CongestionControlRateLimitingStrategy build = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(100).setInitialMaxInFlightMessages(4).setScalingStrategy(AIMDScalingStrategy.builder(4).setIncreaseRate(1).build()).build();
        build.registerInFlightRequest(basicRequestInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
        build.registerInFlightRequest(basicRequestInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isTrue();
        build.registerCompletedRequest(basicResultInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
    }

    @Test
    void testAimdScalingStrategyScaleUpOnSuccess() {
        BasicRequestInfo basicRequestInfo = new BasicRequestInfo(0);
        BasicResultInfo basicResultInfo = new BasicResultInfo(0, 0);
        BasicRequestInfo basicRequestInfo2 = new BasicRequestInfo(2);
        CongestionControlRateLimitingStrategy build = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(100).setInitialMaxInFlightMessages(1).setScalingStrategy(AIMDScalingStrategy.builder(100).setIncreaseRate(10).setDecreaseFactor(0.5d).build()).build();
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
        Assertions.assertThat(build.shouldBlock(basicRequestInfo2)).isTrue();
        build.registerInFlightRequest(basicRequestInfo);
        build.registerCompletedRequest(basicResultInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo2)).isFalse();
    }

    @Test
    void testAimdScalingStrategyScaleDownOnFailure() {
        BasicRequestInfo basicRequestInfo = new BasicRequestInfo(1);
        BasicResultInfo basicResultInfo = new BasicResultInfo(1, 1);
        BasicRequestInfo basicRequestInfo2 = new BasicRequestInfo(2);
        CongestionControlRateLimitingStrategy build = CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(100).setInitialMaxInFlightMessages(2).setScalingStrategy(AIMDScalingStrategy.builder(100).setDecreaseFactor(0.5d).build()).build();
        Assertions.assertThat(build.shouldBlock(basicRequestInfo)).isFalse();
        Assertions.assertThat(build.shouldBlock(basicRequestInfo2)).isFalse();
        build.registerInFlightRequest(basicRequestInfo);
        build.registerCompletedRequest(basicResultInfo);
        Assertions.assertThat(build.shouldBlock(basicRequestInfo2)).isTrue();
    }

    @Test
    void testInvalidMaxInFlightRequests() {
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(0).setInitialMaxInFlightMessages(10).setScalingStrategy(AIMDScalingStrategy.builder(10).build()).build();
        }).withMessageContaining("maxInFlightRequests must be a positive integer.");
    }

    @Test
    void testInvalidMaxInFlightMessages() {
        Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(10).setInitialMaxInFlightMessages(0).setScalingStrategy(AIMDScalingStrategy.builder(10).build()).build();
        }).withMessageContaining("initialMaxInFlightMessages must be a positive integer.");
    }

    @Test
    void testInvalidAimdStrategy() {
        Assertions.assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
            CongestionControlRateLimitingStrategy.builder().setMaxInFlightRequests(10).setInitialMaxInFlightMessages(10).build();
        }).withMessageContaining("scalingStrategy must be provided.");
    }
}
