/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tests.jdbc.sink;

import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory;
import io.debezium.connector.jdbc.util.SinkRecordBuilder;
import io.debezium.testing.system.assertions.JdbcAssertions;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JdbcSinkTests {
    protected final KafkaController kafkaController;
    protected final KafkaConnectController connectController;
    protected final JdbcAssertions assertions;
    protected ConnectorConfigBuilder connectorConfig;
    protected Producer<String, String> kafkaProducer;
    Logger LOGGER = LoggerFactory.getLogger(JdbcAssertions.class);

    public JdbcSinkTests(KafkaController kafkaController, KafkaConnectController connectController, JdbcAssertions assertions, ConnectorConfigBuilder connectorConfig) {
        this.kafkaController = kafkaController;
        this.connectController = connectController;
        this.assertions = assertions;
        this.connectorConfig = connectorConfig;
        this.kafkaProducer = new KafkaProducer(kafkaController.getDefaultProducerProperties());
    }

    private void produceRecordToTopic(String topic, String fieldName, String fieldValue) {
        String kafkaRecord = this.createRecord(fieldName, fieldValue);
        this.LOGGER.info("Producing record to topic {}", (Object)topic);
        this.LOGGER.debug(kafkaRecord);
        ProducerRecord producerRecord = new ProducerRecord(topic, (Object)kafkaRecord);
        this.kafkaProducer.send(producerRecord);
    }

    private String createRecord(String fieldName, String fieldValue) {
        byte[] recordInBytes;
        DebeziumSinkRecordFactory factory = new DebeziumSinkRecordFactory();
        SinkRecord record = SinkRecordBuilder.update().flat(false).name("jdbc-connector-test").recordSchema(SchemaBuilder.struct().field(fieldName, Schema.STRING_SCHEMA).build()).sourceSchema(factory.basicSourceSchema()).after(fieldName, (Object)fieldValue).before(fieldName, (Object)fieldValue).source("ts_ms", (Object)((int)Instant.now().getEpochSecond())).build();
        try (JsonConverter converter = new JsonConverter();){
            HashMap<String, String> config = new HashMap<String, String>();
            config.put("converter.type", "value");
            converter.configure(config);
            recordInBytes = converter.fromConnectData(null, record.valueSchema(), record.value());
        }
        return new String(recordInBytes, StandardCharsets.UTF_8);
    }

    @Test
    @Order(value=10)
    public void shouldHaveRegisteredConnector() {
        Request r = new Request.Builder().url(this.connectController.getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            try (Response res = new OkHttpClient().newCall(r).execute();){
                Assertions.assertThat((String)res.body().string()).contains(new CharSequence[]{this.connectorConfig.getConnectorName()});
            }
        });
    }

    @Test
    @Order(value=20)
    public void shouldStreamChanges() {
        String topic = this.connectorConfig.getAsString("topics");
        this.produceRecordToTopic(topic, "name", "Jerry");
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsCount(1, topic));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsContain(topic, "name", "Jerry"));
    }

    @Test
    @Order(value=30)
    public void shouldBeDown() throws Exception {
        String topic = this.connectorConfig.getAsString("topics");
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        this.produceRecordToTopic(topic, "name", "Nibbles");
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsCount(1, topic));
    }

    @Test
    @Order(value=40)
    public void shouldResumeStreamingAfterRedeployment() throws Exception {
        this.connectController.deployConnector(this.connectorConfig);
        String topic = this.connectorConfig.getAsString("topics");
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsCount(2, topic));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsContain(topic, "name", "Nibbles"));
    }

    @Test
    @Order(value=50)
    public void shouldBeDownAfterCrash() {
        this.connectController.destroy();
        String topic = this.connectorConfig.getAsString("topics");
        this.produceRecordToTopic(topic, "name", "Larry");
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsCount(2, topic));
    }

    @Test
    @Order(value=60)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.connectController.restore();
        String topic = this.connectorConfig.getAsString("topics");
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsCount(3, topic));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRowsContain(topic, "name", "Larry"));
    }
}

