package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractAsyncProducer;
import com.zendesk.maxwell.producer.partitioners.MaxwellKafkaPartitioner;
import com.zendesk.maxwell.row.FieldNames;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.schema.ddl.DDLMap;
import com.zendesk.maxwell.util.StoppableTask;
import com.zendesk.maxwell.util.StoppableTaskState;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* compiled from: MaxwellKafkaProducer.java */
/* loaded from: input_file:com/zendesk/maxwell/producer/MaxwellKafkaProducerWorker.class */
class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MaxwellKafkaProducer.class);
    private final KafkaProducer<String, String> kafka;
    private String topic;
    private final String ddlTopic;
    private final MaxwellKafkaPartitioner partitioner;
    private final MaxwellKafkaPartitioner ddlPartitioner;
    private final RowMap.KeyFormat keyFormat;
    private final boolean interpolateTopic;
    private final ArrayBlockingQueue<RowMap> queue;
    private Thread thread;
    private StoppableTaskState taskState;

    public static MaxwellKafkaPartitioner makeDDLPartitioner(String str, String str2) {
        return str2.equals(FieldNames.TABLE) ? new MaxwellKafkaPartitioner(str, FieldNames.TABLE, null, FieldNames.DATABASE) : new MaxwellKafkaPartitioner(str, FieldNames.DATABASE, null, null);
    }

    public MaxwellKafkaProducerWorker(MaxwellContext maxwellContext, Properties properties, String str, ArrayBlockingQueue<RowMap> arrayBlockingQueue) {
        super(maxwellContext);
        this.topic = str;
        if (this.topic == null) {
            this.topic = "maxwell";
        }
        this.interpolateTopic = this.topic.contains("%{");
        this.kafka = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
        String str2 = maxwellContext.getConfig().kafkaPartitionHash;
        String str3 = maxwellContext.getConfig().producerPartitionKey;
        this.partitioner = new MaxwellKafkaPartitioner(str2, str3, maxwellContext.getConfig().producerPartitionColumns, maxwellContext.getConfig().producerPartitionFallback);
        this.ddlPartitioner = makeDDLPartitioner(str2, str3);
        this.ddlTopic = maxwellContext.getConfig().ddlKafkaTopic;
        if (maxwellContext.getConfig().kafkaKeyFormat.equals("hash")) {
            this.keyFormat = RowMap.KeyFormat.HASH;
        } else {
            this.keyFormat = RowMap.KeyFormat.ARRAY;
        }
        this.queue = arrayBlockingQueue;
        this.taskState = new StoppableTaskState("MaxwellKafkaProducerWorker");
    }

    @Override // java.lang.Runnable
    public void run() {
        this.thread = Thread.currentThread();
        while (true) {
            try {
                RowMap take = this.queue.take();
                if (!this.taskState.isRunning()) {
                    this.taskState.stopped();
                    return;
                }
                push(take);
            } catch (Exception e) {
                this.taskState.stopped();
                this.context.terminate(e);
                return;
            }
        }
    }

    private Integer getNumPartitions(String str) {
        try {
            return Integer.valueOf(this.kafka.partitionsFor(str).size());
        } catch (KafkaException e) {
            LOGGER.error("Topic '" + str + "' name does not exist. Exception: " + e.getLocalizedMessage());
            throw e;
        }
    }

    private String generateTopic(String str, RowMap rowMap) {
        return this.interpolateTopic ? str.replaceAll("%\\{database\\}", rowMap.getDatabase()).replaceAll("%\\{table\\}", rowMap.getTable()) : str;
    }

    @Override // com.zendesk.maxwell.producer.AbstractAsyncProducer
    public void sendAsync(RowMap rowMap, AbstractAsyncProducer.CallbackCompleter callbackCompleter) throws Exception {
        ProducerRecord<String, String> makeProducerRecord = makeProducerRecord(rowMap);
        sendAsync(makeProducerRecord, new KafkaCallback(callbackCompleter, rowMap.getPosition(), (String) makeProducerRecord.key(), KafkaCallback.LOGGER.isDebugEnabled() ? (String) makeProducerRecord.value() : null, this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, this.context));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendAsync(ProducerRecord<String, String> producerRecord, Callback callback) throws Exception {
        this.kafka.send(producerRecord, callback);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProducerRecord<String, String> makeProducerRecord(RowMap rowMap) throws Exception {
        ProducerRecord<String, String> producerRecord;
        String pkToJson = rowMap.pkToJson(this.keyFormat);
        String json = rowMap.toJSON(this.outputConfig);
        if (rowMap instanceof DDLMap) {
            producerRecord = new ProducerRecord<>(this.ddlTopic, Integer.valueOf(this.ddlPartitioner.kafkaPartition(rowMap, getNumPartitions(this.ddlTopic).intValue())), pkToJson, json);
        } else {
            String generateTopic = generateTopic(this.topic, rowMap);
            producerRecord = new ProducerRecord<>(generateTopic, Integer.valueOf(this.partitioner.kafkaPartition(rowMap, getNumPartitions(generateTopic).intValue())), pkToJson, json);
        }
        return producerRecord;
    }

    @Override // com.zendesk.maxwell.util.StoppableTask
    public void requestStop() {
        this.taskState.requestStop();
        this.kafka.close();
    }

    @Override // com.zendesk.maxwell.util.StoppableTask
    public void awaitStop(Long l) throws TimeoutException {
        this.taskState.awaitStop(this.thread, l.longValue());
    }

    @Override // com.zendesk.maxwell.producer.AbstractProducer
    public StoppableTask getStoppableTask() {
        return this;
    }
}
