/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.kafka010;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.landoop.lenses.topology.client.Publisher;
import com.landoop.lenses.topology.client.kafka.metrics.KafkaMetricsBuilder;
import com.landoop.lenses.topology.client.kafka.metrics.KafkaPublisher;
import com.landoop.lenses.topology.client.metrics.MetricsBuilder;
import com.landoop.lenses.topology.client.metrics.MetricsPublishTask;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

public final class ShadowedCachedKafkaProducer$
implements Logging {
    public static final ShadowedCachedKafkaProducer$ MODULE$;
    private long cacheExpireTimeout;
    private final CacheLoader<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> cacheLoader;
    private final Object removalListener;
    private LoadingCache<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> guavaCache;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    static {
        new ShadowedCachedKafkaProducer$();
    }

    private long cacheExpireTimeout$lzycompute() {
        ShadowedCachedKafkaProducer$ shadowedCachedKafkaProducer$ = this;
        synchronized (shadowedCachedKafkaProducer$) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.cacheExpireTimeout = SparkEnv$.MODULE$.get().conf().getTimeAsMs("spark.kafka.producer.cache.timeout", "10m");
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
            return this.cacheExpireTimeout;
        }
    }

    private LoadingCache guavaCache$lzycompute() {
        ShadowedCachedKafkaProducer$ shadowedCachedKafkaProducer$ = this;
        synchronized (shadowedCachedKafkaProducer$) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.guavaCache = CacheBuilder.newBuilder().expireAfterAccess(this.cacheExpireTimeout(), TimeUnit.MILLISECONDS).removalListener(this.removalListener()).build(this.cacheLoader());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
            // MONITOREXIT @DISABLED, blocks:[0, 1] lbl8 : MonitorExitStatement: MONITOREXIT : var1_1
            this.cacheLoader = null;
            this.removalListener = null;
            return this.guavaCache;
        }
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2((Logging)this);
    }

    private long cacheExpireTimeout() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.cacheExpireTimeout$lzycompute() : this.cacheExpireTimeout;
    }

    private CacheLoader<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> cacheLoader() {
        return this.cacheLoader;
    }

    private Object removalListener() {
        return this.removalListener;
    }

    private LoadingCache<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> guavaCache() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.guavaCache$lzycompute() : this.guavaCache;
    }

    /*
     * WARNING - void declaration
     */
    public KafkaProducer<byte[], byte[]> org$apache$spark$sql$kafka010$ShadowedCachedKafkaProducer$$createKafkaProducer(java.util.Map<String, Object> producerConfiguration) {
        Properties props = new Properties();
        props.putAll(producerConfiguration);
        KafkaProducer kafkaProducer = new KafkaProducer(producerConfiguration);
        String key = "lenses.topology.description";
        if (producerConfiguration.containsKey(key)) {
            void var3_3;
            String description = producerConfiguration.get(key).toString();
            String appName = (String)new StringOps(Predef$.MODULE$.augmentString(description)).takeWhile((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final boolean apply(char x$1) {
                    return x$1 != ':';
                }
            });
            String[] topics = new StringOps(Predef$.MODULE$.augmentString((String)new StringOps(Predef$.MODULE$.augmentString((String)new StringOps(Predef$.MODULE$.augmentString(description)).dropWhile((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final boolean apply(char x$2) {
                    return x$2 != ':';
                }
            }))).drop(1))).split(',');
            KafkaPublisher publisher = new KafkaPublisher(props);
            MetricsPublishTask task = new MetricsPublishTask((Publisher)publisher, Duration.ofSeconds(5L));
            Predef$.MODULE$.refArrayOps((Object[])topics).foreach((Function1)new Serializable(kafkaProducer, appName, task){
                public static final long serialVersionUID = 0L;
                private final KafkaProducer kafkaProducer$1;
                private final String appName$1;
                private final MetricsPublishTask task$1;

                public final void apply(String topic) {
                    this.task$1.register(this.appName$1, topic, (MetricsBuilder)new KafkaMetricsBuilder(this.kafkaProducer$1));
                }
                {
                    this.kafkaProducer$1 = kafkaProducer$1;
                    this.appName$1 = appName$1;
                    this.task$1 = task$1;
                }
            });
            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.submit((Runnable)task);
            this.logDebug((Function0<String>)new Serializable(producerConfiguration){
                public static final long serialVersionUID = 0L;
                private final java.util.Map producerConfiguration$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Created a new instance of KafkaProducer for ", "."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.producerConfiguration$1}));
                }
                {
                    this.producerConfiguration$1 = producerConfiguration$1;
                }
            });
            return var3_3;
        }
        throw new RuntimeException("Must specify option 'kafka.lenses.topology.description' as appName:topic1,topic2,...,topicn");
    }

    public KafkaProducer<byte[], byte[]> getOrCreate(java.util.Map<String, Object> kafkaParams) {
        Seq<Tuple2<String, Object>> paramsSeq = this.paramsToSeq(kafkaParams);
        try {
            return (KafkaProducer)this.guavaCache().get(paramsSeq);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ExecutionException ? true : (throwable2 instanceof UncheckedExecutionException ? true : throwable2 instanceof ExecutionError);
            if (bl && throwable2.getCause() != null) {
                throw throwable2.getCause();
            }
            throw throwable;
        }
    }

    /*
     * WARNING - void declaration
     */
    private Seq<Tuple2<String, Object>> paramsToSeq(java.util.Map<String, Object> kafkaParams) {
        void var2_2;
        Seq paramsSeq = (Seq)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(kafkaParams).asScala()).toSeq().sortBy((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(Tuple2<String, Object> x) {
                return (String)x._1();
            }
        }, (Ordering)Ordering.String$.MODULE$);
        return var2_2;
    }

    public void close(java.util.Map<String, Object> kafkaParams) {
        Seq<Tuple2<String, Object>> paramsSeq = this.paramsToSeq(kafkaParams);
        this.guavaCache().invalidate(paramsSeq);
    }

    public void org$apache$spark$sql$kafka010$ShadowedCachedKafkaProducer$$close(Seq<Tuple2<String, Object>> paramsSeq, KafkaProducer<byte[], byte[]> producer) {
        try {
            this.logInfo((Function0<String>)new Serializable(paramsSeq){
                public static final long serialVersionUID = 0L;
                private final Seq paramsSeq$2;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Closing the KafkaProducer with params: ", "."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.paramsSeq$2.mkString("\n")}));
                }
                {
                    this.paramsSeq$2 = paramsSeq$2;
                }
            });
            producer.close();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logWarning((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Error while closing kafka producer.";
                }
            }, e);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    private void clear() {
        this.logInfo((Function0<String>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Cleaning up guava cache.";
            }
        });
        this.guavaCache().invalidateAll();
    }

    private ConcurrentMap<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> getAsMap() {
        return this.guavaCache().asMap();
    }

    private ShadowedCachedKafkaProducer$() {
        MODULE$ = this;
        Logging.class.$init$((Logging)this);
        this.cacheLoader = new CacheLoader<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>>(){

            public KafkaProducer<byte[], byte[]> load(Seq<Tuple2<String, Object>> config) {
                java.util.Map configMap = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)((TraversableOnce)config.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final Tuple2<String, Object> apply(Tuple2<String, Object> x) {
                        return Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x._1()), x._2());
                    }
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava();
                return ShadowedCachedKafkaProducer$.MODULE$.org$apache$spark$sql$kafka010$ShadowedCachedKafkaProducer$$createKafkaProducer(configMap);
            }
        };
        this.removalListener = new RemovalListener<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>>(){

            public void onRemoval(RemovalNotification<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> notification) {
                Seq paramsSeq = (Seq)notification.getKey();
                KafkaProducer producer = (KafkaProducer)notification.getValue();
                ShadowedCachedKafkaProducer$.MODULE$.logDebug((Function0<String>)new Serializable(this, notification, paramsSeq, producer){
                    public static final long serialVersionUID = 0L;
                    private final RemovalNotification notification$1;
                    private final Seq paramsSeq$1;
                    private final KafkaProducer producer$1;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Evicting kafka producer ", " params: ", ", due to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.producer$1, this.paramsSeq$1, this.notification$1.getCause()}));
                    }
                    {
                        this.notification$1 = notification$1;
                        this.paramsSeq$1 = paramsSeq$1;
                        this.producer$1 = producer$1;
                    }
                });
                ShadowedCachedKafkaProducer$.MODULE$.org$apache$spark$sql$kafka010$ShadowedCachedKafkaProducer$$close((Seq<Tuple2<String, Object>>)paramsSeq, (KafkaProducer<byte[], byte[]>)producer);
            }
        };
    }
}

