/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io;

import java.util.Map;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;

public class TestLoggingSink
implements Sink<GenericObject> {
    private Logger logger;
    private Producer<String> producer;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.logger = sinkContext.getLogger();
        String topic = (String)config.getOrDefault("log-topic", "log-topic");
        this.producer = sinkContext.getPulsarClient().newProducer(Schema.STRING).topic(topic).create();
    }

    public void write(Record<GenericObject> record) throws Exception {
        Object nativeObject = ((GenericObject)record.getValue()).getNativeObject();
        this.logger.info("Got message: " + nativeObject + " with schema" + record.getSchema());
        Object payload = nativeObject.toString();
        if (nativeObject instanceof KeyValue) {
            KeyValue kv = (KeyValue)nativeObject;
            String key = kv.getKey().toString();
            String value = kv.getValue().toString();
            if (kv.getKey() instanceof GenericObject) {
                key = ((GenericObject)kv.getKey()).getNativeObject().toString();
            }
            if (kv.getValue() instanceof GenericObject) {
                value = ((GenericObject)kv.getValue()).getNativeObject().toString();
            }
            payload = "(key = " + key + ", value = " + value + ")";
        }
        this.producer.newMessage().properties(record.getProperties()).value((Object)(record.getSchema().getSchemaInfo().getType().name() + " - " + (String)payload)).send();
        record.ack();
    }

    public void close() {
    }
}

