/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogUtil;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.cassandraunit.utils.CqlOperations;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;

public abstract class EmbeddedCassandraConnectorTestBase {
    public static final String TEST_CONNECTOR_NAME = "cassandra-01";
    public static final String TEST_KEYSPACE = "test_keyspace";
    public static final long STARTUP_TIMEOUT_IN_SECONDS = 10L;
    public static final String TEST_CASSANDRA_YAML_CONFIG = "cassandra-unit.yaml";
    public static final String TEST_CASSANDRA_HOSTS = "127.0.0.1";
    public static final int TEST_CASSANDRA_PORT = 9042;
    public static final String TEST_KAFKA_SERVERS = "localhost:9092";
    public static final String TEST_SCHEMA_REGISTRY_URL = "http://localhost:8081";
    public static final String TEST_KAFKA_TOPIC_PREFIX = "test_topic";

    @BeforeClass
    public static void setUpClass() throws Exception {
        EmbeddedCassandraConnectorTestBase.startEmbeddedCassandra();
        EmbeddedCassandraConnectorTestBase.createTestKeyspace();
    }

    @AfterClass
    public static void tearDownClass() {
        EmbeddedCassandraConnectorTestBase.destroyTestKeyspace();
        EmbeddedCassandraConnectorTestBase.stopEmbeddedCassandra();
    }

    protected static void truncateTestKeyspaceTableData() {
        EmbeddedCassandraServerHelper.cleanDataEmbeddedCassandra((String)TEST_KEYSPACE, (String[])new String[0]);
    }

    protected static void deleteTestKeyspaceTables() {
        Session session = EmbeddedCassandraServerHelper.getSession();
        Cluster cluster = EmbeddedCassandraServerHelper.getCluster();
        for (TableMetadata tm : cluster.getMetadata().getKeyspace(TEST_KEYSPACE).getTables()) {
            session.execute("DROP TABLE IF EXISTS " + EmbeddedCassandraConnectorTestBase.keyspaceTable(tm.getName()));
        }
    }

    protected static CassandraConnectorContext generateTaskContext() throws GeneralSecurityException, IOException {
        Properties defaults = EmbeddedCassandraConnectorTestBase.generateDefaultConfigMap();
        return new CassandraConnectorContext(new CassandraConnectorConfig(defaults));
    }

    protected static CassandraConnectorContext generateTaskContext(Map<String, Object> configs) throws GeneralSecurityException, IOException {
        Properties defaults = EmbeddedCassandraConnectorTestBase.generateDefaultConfigMap();
        defaults.putAll(configs);
        return new CassandraConnectorContext(new CassandraConnectorConfig(defaults));
    }

    protected static void deleteTestOffsets(CassandraConnectorContext context) throws IOException {
        File[] files;
        String offsetDirPath = context.getCassandraConnectorConfig().offsetBackingStoreDir();
        File offsetDir = new File(offsetDirPath);
        if (offsetDir.isDirectory() && (files = offsetDir.listFiles()) != null) {
            for (File f : files) {
                Files.delete(f.toPath());
            }
        }
    }

    protected static String keyspaceTable(String tableName) {
        return "test_keyspace." + tableName;
    }

    protected static File generateCommitLogFile() {
        File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
        long ts = System.currentTimeMillis();
        return Paths.get(cdcDir.getAbsolutePath(), "CommitLog-6-" + ts + ".log").toFile();
    }

    protected static void populateFakeCommitLogsForDirectory(int numOfFiles, File directory) throws IOException {
        if (directory.exists() && !directory.isDirectory()) {
            throw new IOException(directory + " is not a directory");
        }
        if (!directory.exists() && !directory.mkdir()) {
            throw new IOException("Cannot create directory " + directory);
        }
        EmbeddedCassandraConnectorTestBase.clearCommitLogFromDirectory(directory, true);
        long prefix = System.currentTimeMillis();
        for (int i = 0; i < numOfFiles; ++i) {
            long ts = prefix + (long)i;
            Path path = Paths.get(directory.getAbsolutePath(), "CommitLog-6-" + ts + ".log");
            boolean success = path.toFile().createNewFile();
            if (success) continue;
            throw new IOException("Failed to create new commit log for testing");
        }
    }

    protected static void clearCommitLogFromDirectory(File directory, boolean recursive) throws IOException {
        File[] directories;
        File[] commitLogs;
        if (!directory.exists() || !directory.isDirectory()) {
            throw new IOException(directory + " is not a valid directory");
        }
        for (File commitLog : commitLogs = CommitLogUtil.getCommitLogs((File)directory)) {
            CommitLogUtil.deleteCommitLog((File)commitLog);
        }
        if (recursive && (directories = directory.listFiles(File::isDirectory)) != null) {
            for (File dir : directories) {
                EmbeddedCassandraConnectorTestBase.clearCommitLogFromDirectory(dir, true);
            }
        }
    }

    protected static Properties generateDefaultConfigMap() throws IOException {
        Properties props = new Properties();
        props.put("connector.name", TEST_CONNECTOR_NAME);
        props.put("cassandra.config", TEST_CASSANDRA_YAML_CONFIG);
        props.put("kafka.topic.prefix", TEST_KAFKA_TOPIC_PREFIX);
        props.put("cassandra.hosts", TEST_CASSANDRA_HOSTS);
        props.put("cassandra.port", String.valueOf(9042));
        props.put("offset.backing.store.dir", Files.createTempDirectory("offset", new FileAttribute[0]).toString());
        props.put("kafka.producer.bootstrap.servers", TEST_KAFKA_SERVERS);
        props.put("commit.log.relocation.dir", Files.createTempDirectory("cdc_raw_relocation", new FileAttribute[0]).toString());
        return props;
    }

    private static void startEmbeddedCassandra() throws Exception {
        EmbeddedCassandraServerHelper.startEmbeddedCassandra((String)TEST_CASSANDRA_YAML_CONFIG, (long)Duration.ofSeconds(10L).toMillis());
    }

    private static void stopEmbeddedCassandra() {
        EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
    }

    private static void createTestKeyspace() {
        Session session = EmbeddedCassandraServerHelper.getSession();
        CqlOperations.createKeyspace((Session)session).accept(TEST_KEYSPACE);
    }

    private static void destroyTestKeyspace() {
        Session session = EmbeddedCassandraServerHelper.getSession();
        CqlOperations.dropKeyspace((Session)session).accept(TEST_KEYSPACE);
    }
}

