/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc.junit.jupiter.e2e.source;

import com.github.dockerjava.api.async.ResultCallback;
import com.github.dockerjava.api.command.LogContainerCmd;
import io.debezium.connector.jdbc.junit.jupiter.JdbcConnectionProvider;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceConnectorOptions;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType;
import io.debezium.connector.jdbc.util.RandomTableNameGenerator;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.DebeziumContainer;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.FrameConsumerResultCallback;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.shaded.org.awaitility.Awaitility;

public class Source
extends JdbcConnectionProvider {
    private static final AtomicInteger sourceId = new AtomicInteger();
    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\nEXEC sys.sp_cdc_enable_table @source_schema = N'%', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
    private final Integer id;
    private final SourceType type;
    private final KafkaContainer kafka;
    private final DebeziumContainer connect;
    private final SourceConnectorOptions options;
    private final RandomTableNameGenerator tableNameGenerator;

    public Source(SourceType type, JdbcDatabaseContainer<?> database, KafkaContainer kafka, DebeziumContainer connect, SourceConnectorOptions options, RandomTableNameGenerator tableGenerator) {
        super(database, new SourceConnectionInitializer(type));
        this.type = type;
        this.id = sourceId.getAndIncrement();
        this.kafka = kafka;
        this.connect = connect;
        this.options = options;
        this.tableNameGenerator = tableGenerator;
    }

    public SourceType getType() {
        return this.type;
    }

    public KafkaContainer getKafka() {
        return this.kafka;
    }

    public int getPort() {
        return this.getContainer().getFirstMappedPort();
    }

    public SourceConnectorOptions getOptions() {
        return this.options;
    }

    public String getSourceConnectorName() {
        return "jdbc-source-" + this.id;
    }

    public String randomTableName() {
        return this.randomObjectName();
    }

    public String randomObjectName() {
        return this.tableNameGenerator.randomName(12);
    }

    public void registerSourceConnector(ConnectorConfiguration config) {
        this.waitUntilStreamingStarted(() -> this.connect.registerConnector(this.getSourceConnectorName(), config));
    }

    protected void waitUntilStreamingStarted(Runnable callback) {
        this.waitUntil("Starting streaming", callback);
    }

    public void waitUntilDeleted() {
        Awaitility.await((String)"Source connector deleted").atMost(60L, TimeUnit.SECONDS).until(() -> {
            try {
                return this.connect.getConnectorState(this.getSourceConnectorName()).equals((Object)Connector.State.UNASSIGNED);
            }
            catch (IllegalStateException e) {
                if (e.getMessage().contains("No status found for connector jdbc-source")) {
                    return true;
                }
                throw e;
            }
        });
    }

    private void waitUntil(String message, Runnable doBeforeWait) {
        WaitingConsumer wait = new WaitingConsumer();
        try (FrameConsumerResultCallback callback = new FrameConsumerResultCallback();){
            callback.addConsumer(OutputFrame.OutputType.STDOUT, (Consumer)wait);
            try (LogContainerCmd command = this.connect.getDockerClient().logContainerCmd(this.connect.getContainerId());){
                command.withFollowStream(Boolean.valueOf(true)).withTail(Integer.valueOf(0)).withStdOut(Boolean.valueOf(true)).exec((ResultCallback)callback);
                if (doBeforeWait != null) {
                    try {
                        doBeforeWait.run();
                    }
                    catch (Exception e) {
                        throw new IllegalStateException("WaitUntil callback failed", e);
                    }
                }
                try {
                    wait.waitUntil(f -> f.getUtf8String().contains(message), 20, TimeUnit.SECONDS);
                }
                catch (TimeoutException e) {
                    throw new IllegalStateException("Failed to wait for '" + message + "'", e);
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Wait failed for message '" + message + "'", e);
        }
    }

    public void streamTable(String tableName) throws Exception {
        if (SourceType.SQLSERVER == this.type) {
            this.execute(ENABLE_TABLE_CDC.replace("#", tableName).replace("%", "dbo"));
        } else if (SourceType.ORACLE == this.type) {
            this.execute("ALTER TABLE " + tableName + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
        }
    }

    private static class SourceConnectionInitializer
    implements JdbcConnectionProvider.ConnectionInitializer {
        private final SourceType type;

        SourceConnectionInitializer(SourceType type) {
            this.type = type;
        }

        @Override
        public void initialize(Connection connection) throws SQLException {
            if (SourceType.SQLSERVER.is(this.type)) {
                try (Statement statement = connection.createStatement();){
                    statement.execute("USE testDB");
                }
            }
        }
    }
}

