/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CommitLogTransfer;
import io.debezium.connector.cassandra.Emitter;
import io.debezium.connector.cassandra.KeyspaceTable;
import io.debezium.connector.cassandra.OffsetWriter;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.SourceInfo;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaRecordEmitter
implements Emitter {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
    private static final int RECORD_LOG_COUNT = 10000;
    private final KafkaProducer<byte[], byte[]> producer;
    private final TopicNamingStrategy<KeyspaceTable> topicNamingStrategy;
    private final OffsetWriter offsetWriter;
    private final Set<String> erroneousCommitLogs;
    private final CommitLogTransfer commitLogTransfer;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final AtomicLong emitCount = new AtomicLong();

    public KafkaRecordEmitter(CassandraConnectorConfig connectorConfig, KafkaProducer<byte[], byte[]> kafkaProducer, OffsetWriter offsetWriter, Converter keyConverter, Converter valueConverter, Set<String> erroneousCommitLogs, CommitLogTransfer commitLogTransfer) {
        this.producer = kafkaProducer;
        this.topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
        this.offsetWriter = offsetWriter;
        this.erroneousCommitLogs = erroneousCommitLogs;
        this.commitLogTransfer = commitLogTransfer;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
    }

    @Override
    public void emit(Record record) {
        try {
            ProducerRecord<byte[], byte[]> producerRecord = this.toProducerRecord(record);
            this.producer.send(producerRecord, (metadata, exception) -> this.callback(record, exception));
            LOGGER.trace("Sent to topic {}: {}", (Object)producerRecord.topic(), (Object)record);
        }
        catch (Exception e) {
            if (record.getSource().snapshot || this.commitLogTransfer.getClass().getName().equals("io.debezium.connector.cassandra.BlackHoleCommitLogTransfer")) {
                throw new DebeziumException(String.format("Failed to send record %s", record), (Throwable)e);
            }
            LOGGER.error("Failed to send the record {}. Error: ", (Object)record, (Object)e);
            this.erroneousCommitLogs.add(record.getSource().offsetPosition.fileName);
        }
    }

    protected ProducerRecord<byte[], byte[]> toProducerRecord(Record record) {
        String topic = this.topicNamingStrategy.dataChangeTopic((DataCollectionId)record.getSource().keyspaceTable);
        byte[] serializedKey = this.keyConverter.fromConnectData(topic, record.getKeySchema(), (Object)record.buildKey());
        byte[] serializedValue = this.valueConverter.fromConnectData(topic, record.getValueSchema(), (Object)record.buildValue());
        return new ProducerRecord(topic, (Object)serializedKey, (Object)serializedValue);
    }

    private void callback(Record record, Exception exception) {
        if (exception != null) {
            LOGGER.error("Failed to emit record {}", (Object)record, (Object)exception);
            return;
        }
        long emitted = this.emitCount.incrementAndGet();
        if (emitted % 10000L == 0L) {
            LOGGER.debug("Emitted {} records to Kafka Broker", (Object)emitted);
            this.emitCount.addAndGet(-emitted);
        }
        if (this.hasOffset(record)) {
            this.markOffset(record);
        }
    }

    private boolean hasOffset(Record record) {
        if (record.getSource().snapshot || this.commitLogTransfer.getClass().getName().equals("io.debezium.connector.cassandra.BlackHoleCommitLogTransfer")) {
            return record.shouldMarkOffset();
        }
        return record.shouldMarkOffset() && !this.erroneousCommitLogs.contains(record.getSource().offsetPosition.fileName);
    }

    private void markOffset(Record record) {
        SourceInfo source = record.getSource();
        String sourceTable = source.keyspaceTable.name();
        String sourceOffset = source.offsetPosition.serialize();
        boolean isSnapshot = source.snapshot;
        this.offsetWriter.markOffset(sourceTable, sourceOffset, isSnapshot);
        if (isSnapshot) {
            LOGGER.debug("Mark snapshot offset for table '{}'", (Object)sourceTable);
        }
    }

    @Override
    public void close() throws Exception {
        this.producer.close();
    }
}

