/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connector.kafka.sink;

import com.netflix.spectator.api.Registry;
import io.mantisrx.connector.kafka.sink.MantisKafkaProducerConfig;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.Parameters;
import io.mantisrx.runtime.parameter.type.StringParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class KafkaSink<T>
implements SelfDocumentingSink<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
    private final Func1<T, byte[]> encoder;
    private final Registry registry;
    private final AtomicReference<KafkaProducer<byte[], byte[]>> kafkaProducerAtomicRef = new AtomicReference<Object>(null);

    KafkaSink(Registry registry, Func1<T, byte[]> encoder) {
        this.encoder = encoder;
        this.registry = registry;
    }

    public void call(Context context, PortRequest ignore, Observable<T> dataO) {
        if (this.kafkaProducerAtomicRef.get() == null) {
            MantisKafkaProducerConfig mantisKafkaProducerConfig = new MantisKafkaProducerConfig(context);
            Map<String, Object> producerProperties = mantisKafkaProducerConfig.getProducerProperties();
            KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
            this.kafkaProducerAtomicRef.compareAndSet(null, (KafkaProducer<byte[], byte[]>)kafkaProducer);
            logger.info("Kafka Producer initialized");
        }
        KafkaProducer<byte[], byte[]> kafkaProducer = this.kafkaProducerAtomicRef.get();
        Parameters parameters = context.getParameters();
        String topic = (String)parameters.get("kafka.sink.producer.topic");
        dataO.map(arg_0 -> this.encoder.call(arg_0)).flatMap(dataBytes -> Observable.from((Future)kafkaProducer.send(new ProducerRecord(topic, dataBytes))).subscribeOn(Schedulers.io())).subscribe();
    }

    public List<ParameterDefinition<?>> getParameters() {
        ArrayList params = new ArrayList();
        params.add(new StringParameter().name("kafka.sink.producer.topic").description("Kafka topic to write to").validator(Validators.notNullOrEmpty()).required().build());
        params.addAll(MantisKafkaProducerConfig.getJobParameterDefinitions());
        return params;
    }

    public Metadata metadata() {
        StringBuilder description = new StringBuilder();
        description.append("Writes the output of the job into the configured Kafka topic");
        return new Metadata.Builder().name("Mantis Kafka Sink").description(description.toString()).build();
    }
}

