/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc.e2e;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.JdbcSinkConnectorTask;
import io.debezium.connector.jdbc.JdbcSinkTaskTestContext;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.ConverterType;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcSinkIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSinkIT.class);
    private JsonConverter keyConverter;
    private JsonConverter valueConverter;
    private JdbcSinkConnectorTask sinkTask;
    private KafkaConsumer<byte[], byte[]> consumer;
    private final ConcurrentLinkedQueue<SinkRecord> consumerRecords = new ConcurrentLinkedQueue();
    private CountDownLatch stopLatch = new CountDownLatch(1);
    private ExecutorService sinkExecutor;
    private JdbcSinkConnectorConfig currentSinkConfig;
    private TimeZone currentSinkTimeZone;

    @AfterEach
    public void afterEach() throws Exception {
        this.stopSink();
    }

    protected JdbcSinkConnectorConfig getCurrentSinkConfig() {
        return this.currentSinkConfig;
    }

    protected TimeZone getCurrentSinkTimeZone() {
        if (this.currentSinkTimeZone == null) {
            this.currentSinkTimeZone = TimeZone.getTimeZone(this.currentSinkConfig.getDatabaseTimeZone());
        }
        return this.currentSinkTimeZone;
    }

    protected void startSink(final Source source, Properties sinkProperties, final String tableName) {
        this.keyConverter = new JsonConverter();
        this.keyConverter.configure(Map.of("converter.type", ConverterType.KEY.getName(), "schemas.enable", "true"));
        this.valueConverter = new JsonConverter();
        this.valueConverter.configure(Map.of("converter.type", ConverterType.VALUE.getName(), "schemas.enable", "true"));
        this.sinkTask = new JdbcSinkConnectorTask();
        HashMap<String, String> configMap = new HashMap<String, String>();
        sinkProperties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> configMap.put((String)k, (String)v)));
        this.currentSinkConfig = new JdbcSinkConnectorConfig(configMap);
        this.sinkTask.initialize((SinkTaskContext)new JdbcSinkTaskTestContext(configMap));
        this.sinkTask.start(configMap);
        Properties consumerProperties = new Properties();
        consumerProperties.put("bootstrap.servers", source.getKafka().getBootstrapServers());
        consumerProperties.put("auto.offset.reset", "earliest");
        consumerProperties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProperties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerProperties.put("allow.auto.create.topics", "false");
        consumerProperties.put("group.id", "jdbc-sink-consumer");
        this.consumer = new KafkaConsumer(consumerProperties);
        this.stopLatch = new CountDownLatch(1);
        this.sinkExecutor = Executors.newFixedThreadPool(1);
        this.sinkExecutor.submit(new Runnable(){

            @Override
            public void run() {
                String pattern = "^" + source.getType().getValue() + ".*" + tableName + "$";
                Pattern regex = Pattern.compile(pattern, 2);
                Awaitility.await((String)"Topic with pattern not created").atMost(60L, TimeUnit.SECONDS).until(() -> AbstractJdbcSinkIT.this.consumer.listTopics().keySet().stream().anyMatch(t -> regex.matcher((CharSequence)t).matches()));
                AbstractJdbcSinkIT.this.consumer.subscribe(regex);
                LOGGER.info("KafkaConsumer thread is now polling for records.");
                while (AbstractJdbcSinkIT.this.stopLatch.getCount() == 1L) {
                    ConsumerRecords records = AbstractJdbcSinkIT.this.consumer.poll(Duration.ofSeconds(1L));
                    LOGGER.info("Consumer poll returned {} records", (Object)records.count());
                    if (records.isEmpty()) continue;
                    records.forEach(r -> AbstractJdbcSinkIT.this.consumerRecords.add(AbstractJdbcSinkIT.this.getSinkRecordFromConsumerRecord((ConsumerRecord<byte[], byte[]>)r)));
                }
                LOGGER.info("Unsubscribing from KafkaConsumer and closing consumer.");
                AbstractJdbcSinkIT.this.consumer.unsubscribe();
                AbstractJdbcSinkIT.this.consumer.close();
            }
        });
    }

    protected void stopSink() throws Exception {
        if (this.sinkExecutor != null) {
            this.stopLatch.countDown();
            this.sinkExecutor.shutdown();
            this.sinkExecutor.awaitTermination(60L, TimeUnit.SECONDS);
            this.sinkExecutor = null;
            this.stopLatch = null;
        }
        if (this.sinkTask != null) {
            this.sinkTask.stop();
            this.sinkTask = null;
        }
    }

    protected SinkRecord consumeSinkRecord() {
        return this.consumeSinkRecords(1).get(0);
    }

    protected List<SinkRecord> consumeSinkRecords(int numRecords) {
        ArrayList<SinkRecord> records = new ArrayList<SinkRecord>();
        Awaitility.await((String)("Expected to receive " + numRecords + " from source connector")).atMost(Duration.ofMinutes(1L)).until(() -> {
            if (this.consumerRecords.size() >= numRecords) {
                for (int i = 0; i < numRecords; ++i) {
                    records.add(this.consumerRecords.poll());
                }
                return true;
            }
            return false;
        });
        this.sinkTask.put(records);
        return records;
    }

    protected SinkRecord getSinkRecordFromConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
        SchemaAndValue valueSchemaAndValue = this.valueConverter.toConnectData(record.topic(), (byte[])record.value());
        SchemaAndValue keySchemaAndValue = this.keyConverter.toConnectData(record.topic(), (byte[])record.key());
        return new SinkRecord(record.topic(), record.partition(), keySchemaAndValue.schema(), keySchemaAndValue.value(), valueSchemaAndValue.schema(), valueSchemaAndValue.value(), record.offset());
    }

    protected ConnectorConfiguration getSourceConnectorConfig(Source source, String tableName) {
        ConnectorConfiguration sourceConfig = ConnectorConfiguration.create();
        sourceConfig.with("tasks.max", Integer.valueOf(1));
        sourceConfig.with("key.converter", JsonConverter.class.getName());
        sourceConfig.with("value.converter", JsonConverter.class.getName());
        sourceConfig.with("topic.prefix", source.getType().getValue());
        sourceConfig.with("database.hostname", source.getType().getValue());
        sourceConfig.with("key.converter.schemas.enabled", "true");
        sourceConfig.with("value.converter.schemas.enabled", "true");
        sourceConfig.with("decimal.handling.mode", "double");
        sourceConfig.with("time.precision.mode", source.getOptions().getTemporalPrecisionMode().getValue());
        if (source.getOptions().isFlatten()) {
            sourceConfig.with("transforms", "flat");
            sourceConfig.with("transforms.flat.type", "io.debezium.transforms.ExtractNewRecordState");
            sourceConfig.with("transforms.flat.drop.tombstones", "false");
        }
        switch (source.getType()) {
            case MYSQL: {
                sourceConfig.with("connector.class", "io.debezium.connector.mysql.MySqlConnector");
                sourceConfig.with("database.password", source.getPassword());
                sourceConfig.with("database.user", "root");
                sourceConfig.with("database.server.id", Integer.valueOf(12345));
                sourceConfig.with("database.include.list", "test");
                sourceConfig.with("table.include.list", "test." + tableName);
                sourceConfig.with("schema.history.internal.kafka.bootstrap.servers", "kafka:9092");
                sourceConfig.with("schema.history.internal.kafka.topic", "schema-history-mysql");
                sourceConfig.with("schema.history.internal.store.only.captured.tables.ddl", "true");
                if (TestHelper.isConnectionTimeZoneUsed()) {
                    sourceConfig.with("driver.connectionTimeZone", TestHelper.getSourceTimeZone());
                    sourceConfig.with("driver.serverTimeZone", TestHelper.getSourceTimeZone());
                }
                if (!source.getOptions().isColumnTypePropagated()) break;
                sourceConfig.with("column.propagate.source.type", "test.*");
                break;
            }
            case POSTGRES: {
                sourceConfig.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
                sourceConfig.with("database.password", source.getPassword());
                sourceConfig.with("database.user", source.getUsername());
                sourceConfig.with("database.dbname", "test");
                sourceConfig.with("slot.drop.on.stop", "true");
                sourceConfig.with("schema.include.list", "public");
                sourceConfig.with("table.include.list", "public." + tableName);
                if (!source.getOptions().isColumnTypePropagated()) break;
                sourceConfig.with("column.propagate.source.type", "public.*");
                break;
            }
            case SQLSERVER: {
                sourceConfig.with("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector");
                sourceConfig.with("database.password", source.getPassword());
                sourceConfig.with("database.user", source.getUsername());
                sourceConfig.with("database.names", "testDB");
                sourceConfig.with("database.encrypt", "false");
                sourceConfig.with("schema.history.internal.kafka.bootstrap.servers", "kafka:9092");
                sourceConfig.with("schema.history.internal.kafka.topic", "schema-history-sqlserver");
                sourceConfig.with("schema.history.internal.store.only.captured.tables.ddl", "true");
                sourceConfig.with("table.include.list", "dbo." + tableName);
                if (!source.getOptions().isColumnTypePropagated()) break;
                sourceConfig.with("column.propagate.source.type", ".*");
                break;
            }
            case ORACLE: {
                sourceConfig.with("connector.class", "io.debezium.connector.oracle.OracleConnector");
                sourceConfig.with("database.dbname", "ORCLCDB");
                sourceConfig.with("database.pdb.name", "ORCLPDB1");
                sourceConfig.with("database.port", "1521");
                sourceConfig.with("database.password", "dbz");
                sourceConfig.with("database.user", "c##dbzuser");
                sourceConfig.with("table.include.list", "debezium." + tableName);
                sourceConfig.with("log.mining.strategy", "online_catalog");
                sourceConfig.with("schema.history.internal.kafka.bootstrap.servers", "kafka:9092");
                sourceConfig.with("schema.history.internal.kafka.topic", "schema-history-oracle");
                sourceConfig.with("schema.history.internal.store.only.captured.tables.ddl", "true");
                if (!source.getOptions().isColumnTypePropagated()) break;
                sourceConfig.with("column.propagate.source.type", "debezium.*");
                break;
            }
            default: {
                throw new IllegalStateException("Unsupported source type: " + source.getType());
            }
        }
        return sourceConfig;
    }
}

