/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.relational.history;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.Collect;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;

@NotThreadSafe
public class KafkaDatabaseHistory
extends AbstractDatabaseHistory {
    public static final Field TOPIC = Field.create("database.history.kafka.topic").withDescription("The name of the topic for the database schema history").withValidation(Field::isRequired);
    public static final Field BOOTSTRAP_SERVERS = Field.create("database.history.kafka.bootstrap.servers").withDescription("A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving database schema history previously stored by the connector. This should point to the same Kafka cluster used by the Kafka Connect process.").withValidation(Field::isRequired);
    public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create("database.history.kafka.recovery.poll.interval.ms").withDescription("The number of milliseconds to wait while polling for persisted data during recovery.").withDefault(100).withValidation(Field::isInteger);
    public static final Field RECOVERY_POLL_ATTEMPTS = Field.create("database.history.kafka.recovery.attempts").withDescription("The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).").withDefault(4).withValidation(Field::isInteger);
    public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(TOPIC, BOOTSTRAP_SERVERS, RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS);
    private static final String CONSUMER_PREFIX = "database.history.consumer.";
    private static final String PRODUCER_PREFIX = "database.history.producer.";
    private final DocumentReader reader = DocumentReader.defaultReader();
    private final Integer partition = new Integer(0);
    private String topicName;
    private Configuration consumerConfig;
    private Configuration producerConfig;
    private KafkaProducer<String, String> producer;
    private int recoveryAttempts = -1;
    private int pollIntervalMs = -1;

    @Override
    public void configure(Configuration config) {
        super.configure(config);
        if (!config.validate(ALL_FIELDS, arg_0 -> ((Logger)this.logger).error(arg_0))) {
            throw new ConnectException("Error configuring an instance of " + this.getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = config.getString(TOPIC);
        this.pollIntervalMs = config.getInteger(RECOVERY_POLL_INTERVAL_MS);
        this.recoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
        String bootstrapServers = config.getString(BOOTSTRAP_SERVERS);
        String clientAndGroupId = UUID.randomUUID().toString();
        this.consumerConfig = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)config.subset(CONSUMER_PREFIX, true).edit().withDefault("bootstrap.servers", bootstrapServers).withDefault("client.id", clientAndGroupId).withDefault("group.id", clientAndGroupId).withDefault("fetch.min.bytes", 1)).withDefault("enable.auto.commit", false)).withDefault("session.timeout.ms", 30000)).withDefault("auto.offset.reset", OffsetResetStrategy.EARLIEST.toString().toLowerCase()).withDefault("key.deserializer", StringDeserializer.class)).withDefault("value.deserializer", StringDeserializer.class)).build();
        this.producerConfig = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)config.subset(PRODUCER_PREFIX, true).edit().withDefault("bootstrap.servers", bootstrapServers).withDefault("client.id", UUID.randomUUID().toString()).withDefault("acks", 1)).withDefault("retries", 1)).withDefault("batch.size", 32768)).withDefault("linger.ms", 0)).withDefault("buffer.memory", 0x100000)).withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        this.logger.info("KafkaDatabaseHistory Consumer config: " + this.consumerConfig);
        this.logger.info("KafkaDatabaseHistory Producer config: " + this.producerConfig);
    }

    @Override
    public void start() {
        super.start();
        this.producer = new KafkaProducer(this.producerConfig.asProperties());
    }

    @Override
    protected void storeRecord(HistoryRecord record) {
        this.logger.trace("Storing record into database history: {}", (Object)record);
        try {
            ProducerRecord produced = new ProducerRecord(this.topicName, this.partition, null, (Object)record.toString());
            Future future = this.producer.send(produced);
            this.producer.flush();
            RecordMetadata metadata = (RecordMetadata)future.get();
            if (metadata != null) {
                this.logger.debug("Stored record in topic '{}' partition {} at offset {} ", new Object[]{metadata.topic(), metadata.partition(), metadata.offset()});
            }
        }
        catch (InterruptedException e) {
            this.logger.error("Interrupted while waiting for response to storing record into database history: {}", (Object)record);
        }
        catch (ExecutionException e) {
            this.logger.error("Error while storing database history record into Kafka: {}", (Object)record, (Object)e);
        }
    }

    @Override
    protected void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<HistoryRecord> records) {
        try (KafkaConsumer historyConsumer = new KafkaConsumer(this.consumerConfig.asProperties());){
            TopicPartition topicPartition = new TopicPartition(this.topicName, this.partition.intValue());
            this.logger.debug("Subscribing to database history topic '{}' partition {} at offset 0", (Object)topicPartition.topic(), (Object)topicPartition.partition());
            historyConsumer.subscribe(Collect.arrayListOf(this.topicName, new String[0]));
            int remainingEmptyPollResults = this.recoveryAttempts;
            while (remainingEmptyPollResults > 0) {
                ConsumerRecords recoveredRecords = historyConsumer.poll((long)this.pollIntervalMs);
                this.logger.debug("Read {} records from database history", (Object)recoveredRecords.count());
                if (recoveredRecords.isEmpty()) {
                    --remainingEmptyPollResults;
                    continue;
                }
                remainingEmptyPollResults = this.recoveryAttempts;
                for (ConsumerRecord record : recoveredRecords) {
                    try {
                        HistoryRecord recordObj = new HistoryRecord(this.reader.read((String)record.value()));
                        records.accept(recordObj);
                        this.logger.trace("Recovered database history: {}" + recordObj);
                    }
                    catch (IOException e) {
                        this.logger.error("Error while deserializing history record", (Throwable)e);
                    }
                }
            }
        }
    }

    @Override
    public void stop() {
        block6: {
            try {
                if (this.producer == null) break block6;
                try {
                    this.producer.flush();
                }
                finally {
                    this.producer.close();
                }
            }
            finally {
                this.producer = null;
                super.stop();
            }
        }
    }

    public String toString() {
        if (this.topicName != null) {
            return "Kakfa topic " + this.topicName + (this.partition != null ? ":" + this.partition : "") + " using brokers at " + this.producerConfig.getString(BOOTSTRAP_SERVERS);
        }
        return "Kafka topic";
    }
}

