/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.service.kafka;

import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.RecordExecutionSubmitService;
import eu.europeana.cloud.service.dps.service.kafka.TaskKafkaSubmitService;
import eu.europeana.cloud.service.dps.service.kafka.util.DpsRecordSerializer;
import java.lang.invoke.CallSite;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordKafkaSubmitService
implements RecordExecutionSubmitService {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskKafkaSubmitService.class);
    private Producer<String, DpsRecord> producer;

    public RecordKafkaSubmitService(String kafkaBroker) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaBroker);
        properties.put("value.serializer", DpsRecordSerializer.class.getName());
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("acks", "1");
        this.producer = new KafkaProducer<String, DpsRecord>(properties);
    }

    @Override
    public void submitRecord(DpsRecord record, String topic) {
        ProducerRecord<CallSite, DpsRecord> data = new ProducerRecord<CallSite, DpsRecord>(topic, (CallSite)((Object)(record.getTaskId() + "_" + record.getRecordId())), record);
        this.producer.send(data);
    }
}

