/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server;

import io.debezium.server.DebeziumMetrics;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestConsumer;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

@QuarkusTest
@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class)
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
public class DebeziumServerPostgresIT {
    private static final int MESSAGE_COUNT = 4;
    @Inject
    DebeziumServer server;
    @Inject
    DebeziumMetrics metrics;

    public DebeziumServerPostgresIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
    }

    @Test
    @Order(value=1)
    public void shouldSnapshot() {
        Testing.Print.enable();
        TestConsumer testConsumer = (TestConsumer)this.server.getConsumer();
        this.waitSnapshotCompletion();
        Assertions.assertThat((int)testConsumer.getValues().size()).isEqualTo(4);
        List values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList());
        Assertions.assertThat((String)((String)values.get(0))).contains(new CharSequence[]{"\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"}"});
        Assertions.assertThat((String)((String)values.get(1))).contains(new CharSequence[]{"\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"}"});
        Assertions.assertThat((String)((String)values.get(2))).contains(new CharSequence[]{"\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"}"});
        Assertions.assertThat((String)((String)values.get(3))).contains(new CharSequence[]{"\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}"});
    }

    @Test
    @Order(value=2)
    public void shouldStream() throws SQLException {
        Testing.Print.enable();
        TestConsumer testConsumer = (TestConsumer)this.server.getConsumer();
        this.waitSnapshotCompletion();
        DebeziumServerPostgresIT.insertNewRow();
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> testConsumer.getValues().size() >= 5);
        List values = testConsumer.getValues().stream().map(Object::toString).collect(Collectors.toList());
        Assertions.assertThat((String)((String)values.get(0))).contains(new CharSequence[]{"\"after\":{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"}"});
        Assertions.assertThat((String)((String)values.get(1))).contains(new CharSequence[]{"\"after\":{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"}"});
        Assertions.assertThat((String)((String)values.get(2))).contains(new CharSequence[]{"\"after\":{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"}"});
        Assertions.assertThat((String)((String)values.get(3))).contains(new CharSequence[]{"\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}"});
        Assertions.assertThat((String)((String)values.get(4))).contains(new CharSequence[]{"\"after\":{\"id\":1005,\"first_name\":\"Jon\",\"last_name\":\"Snow\",\"email\":\"jon_snow@gameofthrones.com\"}"});
    }

    private void waitSnapshotCompletion() {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            try {
                return this.metrics.snapshotCompleted() && this.metrics.streamingQueueCurrentSize() == 0 && this.metrics.maxQueueSize() == 8192;
            }
            catch (Exception e) {
                return false;
            }
        });
    }

    private static void insertNewRow() throws SQLException {
        SqlDatabaseClient sqlDatabaseClient = new SqlDatabaseClient(DebeziumServerPostgresIT.getJdbcUrl(), "postgres", "postgres");
        String sql = "INSERT INTO inventory.customers VALUES  (default, 'Jon', 'Snow', 'jon_snow@gameofthrones.com')";
        sqlDatabaseClient.execute("inventory", sql);
    }

    public static String getJdbcUrl() {
        return String.format("jdbc:postgresql://%s:%s/", "localhost", PostgresTestResourceLifecycleManager.getContainer().getMappedPort(PostgresTestResourceLifecycleManager.POSTGRES_PORT.intValue()).toString());
    }
}

