/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.redis;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.redis.RedisTestConfigSource;
import io.debezium.server.redis.RedisTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.nio.file.Path;
import java.time.Duration;
import javax.enterprise.event.Observes;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=RedisTestResourceLifecycleManager.class)})
public class RedisStreamIT {
    @ConfigProperty(name="debezium.source.database.hostname")
    String dbHostname;
    @ConfigProperty(name="debezium.source.database.port")
    String dbPort;
    @ConfigProperty(name="debezium.source.database.user")
    String dbUser;
    @ConfigProperty(name="debezium.source.database.password")
    String dbPassword;
    @ConfigProperty(name="debezium.source.database.dbname")
    String dbName;
    protected static Jedis jedis;

    public RedisStreamIT() {
        Testing.Files.delete((Path)RedisTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)RedisTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) {
        Testing.Print.enable();
        jedis = new Jedis(HostAndPort.from((String)RedisTestResourceLifecycleManager.getRedisContainerAddress()));
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    private PostgresConnection getPostgresConnection() {
        return new PostgresConnection(Configuration.create().with("hostname", this.dbHostname).with("port", this.dbPort).with("user", this.dbUser).with("password", this.dbPassword).with("dbname", this.dbName).build());
    }

    private Long getStreamLength(String streamName, int expectedLength) {
        Awaitility.await().atMost(Duration.ofSeconds(RedisTestConfigSource.waitForSeconds())).until(() -> jedis.xlen(streamName) == (long)expectedLength);
        return jedis.xlen(streamName);
    }

    @Test
    public void testRedisStream() throws Exception {
        int MESSAGE_COUNT = 4;
        String STREAM_NAME = "testc.inventory.customers";
        Long streamLength = this.getStreamLength("testc.inventory.customers", 4);
        Assert.assertTrue((String)"Redis Basic Stream Test Failed", (streamLength == 4L ? 1 : 0) != 0);
    }

    @Test
    @FixFor(value={"DBZ-4510"})
    public void testRedisConnectionRetry() throws Exception {
        int MESSAGE_COUNT = 5;
        String STREAM_NAME = "testc.inventory.redis_test";
        Testing.print((Object)"Pausing container");
        RedisTestResourceLifecycleManager.pause();
        PostgresConnection connection = this.getPostgresConnection();
        Testing.print((Object)"Creating new redis_test table and inserting 5 records to it");
        connection.execute(new String[]{"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", "INSERT INTO inventory.redis_test VALUES (1)", "INSERT INTO inventory.redis_test VALUES (2)", "INSERT INTO inventory.redis_test VALUES (3)", "INSERT INTO inventory.redis_test VALUES (4)", "INSERT INTO inventory.redis_test VALUES (5)"});
        connection.close();
        Testing.print((Object)"Sleeping for 5 seconds to simulate no connection errors");
        Thread.sleep(5000L);
        Testing.print((Object)"Unpausing container");
        RedisTestResourceLifecycleManager.unpause();
        Long streamLength = this.getStreamLength("testc.inventory.redis_test", 5);
        Testing.print((Object)("Entries in testc.inventory.redis_test:" + streamLength));
        Assert.assertTrue((String)"Redis Connection Test Failed", (streamLength == 5L ? 1 : 0) != 0);
    }

    @Test
    @FixFor(value={"DBZ-4510"})
    public void testRedisOOMRetry() throws Exception {
        String STREAM_NAME = "testc.inventory.redis_test2";
        int FIRST_BATCH_SIZE = 10;
        int EXPECTED_STREAM_LENGTH_AFTER_DELETION = 30;
        int SECOND_BATCH_SIZE = 22;
        String INSERT_SQL = "INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)";
        Testing.print((Object)"Setting Redis' maxmemory to 2M");
        jedis.configSet("maxmemory", "1M");
        PostgresConnection connection = this.getPostgresConnection();
        connection.execute(new String[]{"CREATE TABLE inventory.redis_test2 (id VARCHAR(100) PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100))", String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", 10)});
        Long streamLengthAfterInserts = this.getStreamLength("testc.inventory.redis_test2", 10);
        Testing.print((Object)("Entries in testc.inventory.redis_test2:" + streamLengthAfterInserts));
        connection.execute(new String[]{"DELETE FROM inventory.redis_test2"});
        Long streamLengthAfterDeletion = this.getStreamLength("testc.inventory.redis_test2", 30);
        Testing.print((Object)("Entries in testc.inventory.redis_test2:" + streamLengthAfterDeletion));
        connection.execute(new String[]{String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", 22)});
        connection.close();
        Thread.sleep(1000L);
        Testing.print((Object)"Deleting stream in order to free memory");
        jedis.del("testc.inventory.redis_test2");
        Long streamLength = this.getStreamLength("testc.inventory.redis_test2", 22);
        Testing.print((Object)("Entries in testc.inventory.redis_test2:" + streamLength));
        jedis.configSet("maxmemory", "0");
        Assert.assertTrue((String)"Redis OOM Test Failed", (streamLength == 22L ? 1 : 0) != 0);
    }
}

