/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.redis;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.redis.RedisStreamTestProfile;
import io.debezium.server.redis.RedisTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import java.time.Duration;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

@QuarkusIntegrationTest
@TestProfile(value=RedisStreamTestProfile.class)
@QuarkusTestResource(value=RedisTestResourceLifecycleManager.class)
public class RedisStreamIT {
    private PostgresConnection getPostgresConnection() {
        return new PostgresConnection((JdbcConfiguration)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)JdbcConfiguration.create().with("user", "postgres")).with("password", "postgres")).with("dbname", "postgres")).with("hostname", "localhost")).with("port", (Object)PostgresTestResourceLifecycleManager.getContainer().getMappedPort(PostgresTestResourceLifecycleManager.POSTGRES_PORT.intValue()))).build(), "Debezium Redis Test");
    }

    private Long getStreamLength(Jedis jedis, String streamName, int expectedLength) {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> jedis.xlen(streamName) == (long)expectedLength);
        return jedis.xlen(streamName);
    }

    @Test
    public void testRedisStream() throws Exception {
        Jedis jedis = new Jedis(HostAndPort.from((String)RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        int MESSAGE_COUNT = 4;
        String STREAM_NAME = "testc.inventory.customers";
        Long streamLength = this.getStreamLength(jedis, "testc.inventory.customers", 4);
        Assert.assertTrue((String)"Redis Basic Stream Test Failed", (streamLength == 4L ? 1 : 0) != 0);
        jedis.close();
    }

    @Test
    @FixFor(value={"DBZ-4510"})
    public void testRedisConnectionRetry() throws Exception {
        Testing.Print.enable();
        int MESSAGE_COUNT = 5;
        String STREAM_NAME = "testc.inventory.redis_test";
        Jedis jedis = new Jedis(HostAndPort.from((String)RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        Testing.print((Object)"Pausing container");
        RedisTestResourceLifecycleManager.pause();
        PostgresConnection connection = this.getPostgresConnection();
        Testing.print((Object)"Creating new redis_test table and inserting 5 records to it");
        connection.execute(new String[]{"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", "INSERT INTO inventory.redis_test VALUES (1)", "INSERT INTO inventory.redis_test VALUES (2)", "INSERT INTO inventory.redis_test VALUES (3)", "INSERT INTO inventory.redis_test VALUES (4)", "INSERT INTO inventory.redis_test VALUES (5)"});
        connection.close();
        Testing.print((Object)"Sleeping for 3 seconds to simulate no connection errors");
        Thread.sleep(3000L);
        Testing.print((Object)"Unpausing container");
        RedisTestResourceLifecycleManager.unpause();
        Thread.sleep(2000L);
        Long streamLength = jedis.xlen("testc.inventory.redis_test");
        Testing.print((Object)("Entries in testc.inventory.redis_test:" + streamLength));
        jedis.close();
        Assert.assertTrue((String)"Redis Connection Test Failed", (streamLength == 5L ? 1 : 0) != 0);
    }

    @Test
    @FixFor(value={"DBZ-4510"})
    public void testRedisOOMRetry() throws Exception {
        Testing.Print.enable();
        Jedis jedis = new Jedis(HostAndPort.from((String)RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        String STREAM_NAME = "testc.inventory.redis_test2";
        int TOTAL_RECORDS = 50;
        Testing.print((Object)"Setting Redis' maxmemory to 1M");
        jedis.configSet("maxmemory", "1M");
        PostgresConnection connection = this.getPostgresConnection();
        connection.execute(new String[]{"CREATE TABLE inventory.redis_test2 (id VARCHAR(100) PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100))"});
        connection.execute(new String[]{String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", 50)});
        connection.commit();
        Thread.sleep(1000L);
        Testing.print((Object)("Entries in testc.inventory.redis_test2:" + jedis.xlen("testc.inventory.redis_test2")));
        Assert.assertTrue((jedis.xlen("testc.inventory.redis_test2") < 50L ? 1 : 0) != 0);
        Thread.sleep(1000L);
        jedis.configSet("maxmemory", "0");
        Long streamLength = this.getStreamLength(jedis, "testc.inventory.redis_test2", 50);
        Assert.assertTrue((String)"Redis OOM Test Failed", (streamLength == 50L ? 1 : 0) != 0);
    }
}

