package com.yotpo.metorikku.input.readers.kafka;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import com.yotpo.metorikku.exceptions.MetorikkuException;
import com.yotpo.metorikku.exceptions.MetorikkuException$;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.streaming.SourceProgress;
import org.apache.spark.sql.streaming.StreamingQueryListener;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: KafkaLagWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001a4AAC\u0006\u00011!Aq\u0005\u0001B\u0001B\u0003%\u0001\u0006\u0003\u0005?\u0001\t\u0005\t\u0015!\u00032\u0011\u0015y\u0004\u0001\"\u0001A\u0011\u001dY\u0003A1A\u0005\n\u0015CaA\u0012\u0001!\u0002\u0013A\u0003\u0002C$\u0001\u0011\u000b\u0007I\u0011\u0001%\t\u000bQ\u0003A\u0011A+\t\u000b1\u0004A\u0011A7\t\u000bI\u0004A\u0011A:\u0003\u001d-\u000bgm[1MC\u001e<&/\u001b;fe*\u0011A\"D\u0001\u0006W\u000647.\u0019\u0006\u0003\u001d=\tqA]3bI\u0016\u00148O\u0003\u0002\u0011#\u0005)\u0011N\u001c9vi*\u0011!cE\u0001\n[\u0016$xN]5lWVT!\u0001F\u000b\u0002\u000be|G\u000f]8\u000b\u0003Y\t1aY8n\u0007\u0001\u0019\"\u0001A\r\u0011\u0005i)S\"A\u000e\u000b\u0005qi\u0012!C:ue\u0016\fW.\u001b8h\u0015\tqr$A\u0002tc2T!\u0001I\u0011\u0002\u000bM\u0004\u0018M]6\u000b\u0005\t\u001a\u0013AB1qC\u000eDWMC\u0001%\u0003\ry'oZ\u0005\u0003Mm\u0011ac\u0015;sK\u0006l\u0017N\\4Rk\u0016\u0014\u0018\u0010T5ti\u0016tWM]\u0001\u000eW\u000647.Y\"p]N,X.\u001a:\u0011\t%z\u0013'M\u0007\u0002U)\u00111\u0006L\u0001\tG>t7/^7fe*\u0011QFL\u0001\bG2LWM\u001c;t\u0015\ta\u0011%\u0003\u00021U\ti1*\u00194lC\u000e{gn];nKJ\u0004\"AM\u001e\u000f\u0005MJ\u0004C\u0001\u001b8\u001b\u0005)$B\u0001\u001c\u0018\u0003\u0019a$o\\8u})\t\u0001(A\u0003tG\u0006d\u0017-\u0003\u0002;o\u00051\u0001K]3eK\u001aL!\u0001P\u001f\u0003\rM#(/\u001b8h\u0015\tQt'A\u0003u_BL7-\u0001\u0004=S:LGO\u0010\u000b\u0004\u0003\u000e#\u0005C\u0001\"\u0001\u001b\u0005Y\u0001\"B\u0014\u0004\u0001\u0004A\u0003\"\u0002 \u0004\u0001\u0004\tT#\u0001\u0015\u0002\u0013\r|gn];nKJ\u0004\u0013a\u00017pOV\t\u0011\n\u0005\u0002K\u001b6\t1J\u0003\u0002MC\u0005)An\\45U&\u0011aj\u0013\u0002\u0007\u0019><w-\u001a:)\u0005\u0019\u0001\u0006CA)S\u001b\u00059\u0014BA*8\u0005%!(/\u00198tS\u0016tG/\u0001\bp]F+XM]=Ti\u0006\u0014H/\u001a3\u0015\u0005YK\u0006CA)X\u0013\tAvG\u0001\u0003V]&$\b\"\u0002.\b\u0001\u0004Y\u0016!B3wK:$\bC\u0001/j\u001d\tivM\u0004\u0002_M:\u0011q,\u001a\b\u0003A\u0012t!!Y2\u000f\u0005Q\u0012\u0017\"\u0001\u0013\n\u0005\t\u001a\u0013B\u0001\u0011\"\u0013\tqr$\u0003\u0002\u001d;%\u0011\u0001nG\u0001\u0017'R\u0014X-Y7j]\u001e\fV/\u001a:z\u0019&\u001cH/\u001a8fe&\u0011!n\u001b\u0002\u0012#V,'/_*uCJ$X\rZ#wK:$(B\u00015\u001c\u0003Eyg.U;fef$VM]7j]\u0006$X\r\u001a\u000b\u0003-:DQA\u0017\u0005A\u0002=\u0004\"\u0001\u00189\n\u0005E\\'\u0001F)vKJLH+\u001a:nS:\fG/\u001a3Fm\u0016tG/A\bp]F+XM]=Qe><'/Z:t)\t1F\u000fC\u0003[\u0013\u0001\u0007Q\u000f\u0005\u0002]m&\u0011qo\u001b\u0002\u0013#V,'/\u001f)s_\u001e\u0014Xm]:Fm\u0016tG\u000f")
/* loaded from: input_file:com/yotpo/metorikku/input/readers/kafka/KafkaLagWriter.class */
public class KafkaLagWriter extends StreamingQueryListener {
    private transient Logger log;
    private final String topic;
    private final KafkaConsumer<String, String> consumer;
    private volatile transient boolean bitmap$trans$0;

    private KafkaConsumer<String, String> consumer() {
        return this.consumer;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [com.yotpo.metorikku.input.readers.kafka.KafkaLagWriter] */
    private Logger log$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.log = LogManager.getLogger(getClass());
                r0 = this;
                r0.bitmap$trans$0 = true;
            }
        }
        return this.log;
    }

    public Logger log() {
        return !this.bitmap$trans$0 ? log$lzycompute() : this.log;
    }

    public void onQueryStarted(StreamingQueryListener.QueryStartedEvent queryStartedEvent) {
    }

    public void onQueryTerminated(StreamingQueryListener.QueryTerminatedEvent queryTerminatedEvent) {
    }

    public void onQueryProgress(StreamingQueryListener.QueryProgressEvent queryProgressEvent) {
        log().info(new StringBuilder(49).append("using consumer group to commit offsets for topic ").append(this.topic).toString());
        ObjectMapper configure = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).configure(DeserializationFeature.USE_LONG_FOR_INTS, true);
        configure.registerModule(DefaultScalaModule$.MODULE$);
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(queryProgressEvent.progress().sources())).foreach(sourceProgress -> {
            $anonfun$onQueryProgress$1(this, configure, sourceProgress);
            return BoxedUnit.UNIT;
        });
    }

    /* JADX WARN: Type inference failed for: r0v15, types: [scala.collection.Iterable] */
    public static final /* synthetic */ void $anonfun$onQueryProgress$2(KafkaLagWriter kafkaLagWriter, Map map, String str) {
        kafkaLagWriter.log().debug(new StringBuilder(29).append("committing offsets for topic ").append(str).toString());
        HashMap hashMap = new HashMap();
        Serializable serializable = map.get(str);
        if (!(serializable instanceof Some)) {
            throw new MetorikkuException("could not fetch topic offsets", MetorikkuException$.MODULE$.apply$default$2());
        }
        Map map2 = (Map) ((Some) serializable).value();
        map2.keys().foreach(str2 -> {
            return (OffsetAndMetadata) hashMap.put(new TopicPartition(str, new StringOps(Predef$.MODULE$.augmentString(str2)).toInt()), new OffsetAndMetadata(BoxesRunTime.unboxToLong(map2.mo2289apply((Map) str2))));
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        kafkaLagWriter.consumer().commitSync(hashMap);
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [scala.collection.Iterable] */
    public static final /* synthetic */ void $anonfun$onQueryProgress$1(KafkaLagWriter kafkaLagWriter, ObjectMapper objectMapper, SourceProgress sourceProgress) {
        Map map = (Map) objectMapper.readValue(sourceProgress.endOffset(), Map.class);
        map.keys().foreach(str -> {
            $anonfun$onQueryProgress$2(kafkaLagWriter, map, str);
            return BoxedUnit.UNIT;
        });
    }

    public KafkaLagWriter(KafkaConsumer<String, String> kafkaConsumer, String str) {
        this.topic = str;
        this.consumer = kafkaConsumer;
    }
}
