/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.pool.kafka;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.darkphoenixs.pool.ConnectionPool;

public class KafkaSharedConnPool
implements ConnectionPool<Producer<byte[], byte[]>> {
    private static final AtomicReference<KafkaSharedConnPool> pool = new AtomicReference();
    private final Producer<byte[], byte[]> producer;

    private KafkaSharedConnPool(Properties properties) {
        this.producer = new KafkaProducer(properties);
    }

    public static synchronized KafkaSharedConnPool getInstance(String brokers, String codec, String keySer, String valSer) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("compression.codec", codec);
        properties.setProperty("key.serializer", keySer);
        properties.setProperty("value.serializer", valSer);
        return KafkaSharedConnPool.getInstance(properties);
    }

    public static synchronized KafkaSharedConnPool getInstance(Properties properties) {
        if (pool.get() == null) {
            pool.set(new KafkaSharedConnPool(properties));
        }
        return pool.get();
    }

    @Override
    public Producer<byte[], byte[]> getConnection() {
        return this.producer;
    }

    @Override
    public void returnConnection(Producer<byte[], byte[]> conn) {
        if (conn != null) {
            conn.flush();
        }
    }

    @Override
    public void invalidateConnection(Producer<byte[], byte[]> conn) {
        if (conn != null) {
            conn.close();
        }
    }

    public void close() {
        this.producer.flush();
        this.producer.close();
        pool.set(null);
    }
}

