/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logging;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.scheduler.AllocatedBlocks;
import org.apache.spark.streaming.scheduler.BatchAllocationEvent;
import org.apache.spark.streaming.scheduler.BatchCleanupEvent;
import org.apache.spark.streaming.scheduler.BlockAdditionEvent;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.streaming.scheduler.ReceivedBlockTracker$;
import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015g!B\u0001\u0003\u0001\u0011a!\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'O\u0003\u0002\u0004\t\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c2\u0001A\u0007\u0014!\tq\u0011#D\u0001\u0010\u0015\u0005\u0001\u0012!B:dC2\f\u0017B\u0001\n\u0010\u0005\u0019\te.\u001f*fMB\u0011AcF\u0007\u0002+)\u0011aCB\u0001\tS:$XM\u001d8bY&\u0011\u0001$\u0006\u0002\b\u0019><w-\u001b8h\u0011!Q\u0002A!A!\u0002\u0013a\u0012\u0001B2p]\u001a\u001c\u0001\u0001\u0005\u0002\u001e=5\ta!\u0003\u0002 \r\tI1\u000b]1sW\u000e{gN\u001a\u0005\tC\u0001\u0011\t\u0011)A\u0005E\u0005Q\u0001.\u00193p_B\u001cuN\u001c4\u0011\u0005\r:S\"\u0001\u0013\u000b\u0005i)#B\u0001\u0014\t\u0003\u0019A\u0017\rZ8pa&\u0011\u0001\u0006\n\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\t\u0011)\u0002!\u0011!Q\u0001\n-\n\u0011b\u001d;sK\u0006l\u0017\nZ:\u0011\u00071\"tG\u0004\u0002.e9\u0011a&M\u0007\u0002_)\u0011\u0001gG\u0001\u0007yI|w\u000e\u001e \n\u0003AI!aM\b\u0002\u000fA\f7m[1hK&\u0011QG\u000e\u0002\u0004'\u0016\f(BA\u001a\u0010!\tq\u0001(\u0003\u0002:\u001f\t\u0019\u0011J\u001c;\t\u0011m\u0002!\u0011!Q\u0001\nq\nQa\u00197pG.\u0004\"!\u0010!\u000e\u0003yR!a\u0010\u0004\u0002\tU$\u0018\u000e\\\u0005\u0003\u0003z\u0012Qa\u00117pG.D\u0001b\u0011\u0001\u0003\u0002\u0003\u0006I\u0001R\u0001\u0019e\u0016\u001cwN^3s\rJ|Wn\u0016:ji\u0016\f\u0005.Z1e\u0019><\u0007C\u0001\bF\u0013\t1uBA\u0004C_>dW-\u00198\t\u0011!\u0003!\u0011!Q\u0001\n%\u000b1c\u00195fG.\u0004x.\u001b8u\t&\u0014x\n\u001d;j_:\u00042A\u0004&M\u0013\tYuB\u0001\u0004PaRLwN\u001c\t\u0003\u001bBs!A\u0004(\n\u0005={\u0011A\u0002)sK\u0012,g-\u0003\u0002R%\n11\u000b\u001e:j]\u001eT!aT\b\t\u000bQ\u0003A\u0011A+\u0002\rqJg.\u001b;?)\u001d1\u0006,\u0017.\\9v\u0003\"a\u0016\u0001\u000e\u0003\tAQAG*A\u0002qAQ!I*A\u0002\tBQAK*A\u0002-BQaO*A\u0002qBQaQ*A\u0002\u0011CQ\u0001S*A\u0002%+Aa\u0018\u0001\u0005A\n\u0011\"+Z2fSZ,GM\u00117pG.\fV/Z;f!\r\tg\r[\u0007\u0002E*\u00111\rZ\u0001\b[V$\u0018M\u00197f\u0015\t)w\"\u0001\u0006d_2dWm\u0019;j_:L!a\u001a2\u0003\u000bE+X-^3\u0011\u0005]K\u0017B\u00016\u0003\u0005E\u0011VmY3jm\u0016$'\t\\8dW&sgm\u001c\u0005\bY\u0002\u0011\r\u0011\"\u0003n\u0003\u0001\u001aHO]3b[&#Gk\\+oC2dwnY1uK\u0012\u0014En\\2l#V,W/Z:\u0016\u00039\u0004B!Y88c&\u0011\u0001O\u0019\u0002\b\u0011\u0006\u001c\b.T1q!\t\u0011h,D\u0001\u0001\u0011\u0019!\b\u0001)A\u0005]\u0006\t3\u000f\u001e:fC6LE\rV8V]\u0006dGn\\2bi\u0016$'\t\\8dWF+X-^3tA!9a\u000f\u0001b\u0001\n\u00139\u0018!\u0006;j[\u0016$v.\u00117m_\u000e\fG/\u001a3CY>\u001c7n]\u000b\u0002qB!\u0011m\\=~!\tQ80D\u0001\u0005\u0013\taHA\u0001\u0003US6,\u0007CA,\u007f\u0013\ty(AA\bBY2|7-\u0019;fI\ncwnY6t\u0011\u001d\t\u0019\u0001\u0001Q\u0001\na\fa\u0003^5nKR{\u0017\t\u001c7pG\u0006$X\r\u001a\"m_\u000e\\7\u000f\t\u0005\n\u0003\u000f\u0001!\u0019!C\u0005\u0003\u0013\t1c\u001e:ji\u0016\f\u0005.Z1e\u0019><w\n\u001d;j_:,\"!a\u0003\u0011\t9Q\u0015Q\u0002\t\u0005\u0003\u001f\t\u0019\"\u0004\u0002\u0002\u0012)\u0011q\bB\u0005\u0005\u0003+\t\tBA\u0007Xe&$X-\u00115fC\u0012dun\u001a\u0005\t\u00033\u0001\u0001\u0015!\u0003\u0002\f\u0005!rO]5uK\u0006CW-\u00193M_\u001e|\u0005\u000f^5p]\u0002B\u0011\"!\b\u0001\u0001\u0004%I!a\b\u0002-1\f7\u000f^!mY>\u001c\u0017\r^3e\u0005\u0006$8\r\u001b+j[\u0016,\u0012!\u001f\u0005\n\u0003G\u0001\u0001\u0019!C\u0005\u0003K\t!\u0004\\1ti\u0006cGn\\2bi\u0016$')\u0019;dQRKW.Z0%KF$B!a\n\u0002.A\u0019a\"!\u000b\n\u0007\u0005-rB\u0001\u0003V]&$\b\"CA\u0018\u0003C\t\t\u00111\u0001z\u0003\rAH%\r\u0005\b\u0003g\u0001\u0001\u0015)\u0003z\u0003]a\u0017m\u001d;BY2|7-\u0019;fI\n\u000bGo\u00195US6,\u0007\u0005C\u0004\u00028\u0001!\t!!\u000f\u0002\u0011\u0005$GM\u00117pG.$2\u0001RA\u001e\u0011\u001d\ti$!\u000eA\u0002!\f\u0011C]3dK&4X\r\u001a\"m_\u000e\\\u0017J\u001c4p\u0011\u001d\t\t\u0005\u0001C\u0001\u0003\u0007\nQ#\u00197m_\u000e\fG/\u001a\"m_\u000e\\7\u000fV8CCR\u001c\u0007\u000e\u0006\u0003\u0002(\u0005\u0015\u0003bBA$\u0003\u007f\u0001\r!_\u0001\nE\u0006$8\r\u001b+j[\u0016Dq!a\u0013\u0001\t\u0003\ti%\u0001\thKR\u0014En\\2lg>3')\u0019;dQR!\u0011qJA,!\u0019i\u0015\u0011K\u001c\u0002V%\u0019\u00111\u000b*\u0003\u00075\u000b\u0007\u000fE\u0002-i!Dq!a\u0012\u0002J\u0001\u0007\u0011\u0010C\u0004\u0002\\\u0001!\t!!\u0018\u00023\u001d,GO\u00117pG.\u001cxJ\u001a\"bi\u000eD\u0017I\u001c3TiJ,\u0017-\u001c\u000b\u0007\u0003+\ny&!\u0019\t\u000f\u0005\u001d\u0013\u0011\fa\u0001s\"9\u00111MA-\u0001\u00049\u0014\u0001C:ue\u0016\fW.\u00133\t\u000f\u0005\u001d\u0004\u0001\"\u0001\u0002j\u0005a\u0002.Y:V]\u0006dGn\\2bi\u0016$'+Z2fSZ,GM\u00117pG.\u001cX#\u0001#\t\u000f\u00055\u0004\u0001\"\u0001\u0002p\u0005!r-\u001a;V]\u0006dGn\\2bi\u0016$'\t\\8dWN$B!!\u0016\u0002r!9\u00111MA6\u0001\u00049\u0004bBA;\u0001\u0011\u0005\u0011qO\u0001\u0012G2,\u0017M\\;q\u001f2$')\u0019;dQ\u0016\u001cHCBA\u0014\u0003s\ni\bC\u0004\u0002|\u0005M\u0004\u0019A=\u0002#\rdW-\u00198vaRC'/Z:i)&lW\rC\u0004\u0002\u0000\u0005M\u0004\u0019\u0001#\u0002#]\f\u0017\u000e\u001e$pe\u000e{W\u000e\u001d7fi&|g\u000eC\u0004\u0002\u0004\u0002!\t!!\"\u0002\tM$x\u000e\u001d\u000b\u0003\u0003OAq!!#\u0001\t\u0013\t))A\tsK\u000e|g/\u001a:QCN$XI^3oiND\u0001\"!$\u0001\t\u0003!\u0011qR\u0001\u000boJLG/\u001a+p\u0019><Gc\u0001#\u0002\u0012\"A\u00111SAF\u0001\u0004\t)*\u0001\u0004sK\u000e|'\u000f\u001a\t\u0004/\u0006]\u0015bAAM\u0005\ta\"+Z2fSZ,GM\u00117pG.$&/Y2lKJdunZ#wK:$\bbBAO\u0001\u0011%\u0011qT\u0001\u0016O\u0016$(+Z2fSZ,GM\u00117pG.\fV/Z;f)\r\t\u0018\u0011\u0015\u0005\b\u0003G\nY\n1\u00018\u0011\u001d\t)\u000b\u0001C\u0005\u0003O\u000b1c\u0019:fCR,wK]5uK\u0006CW-\u00193M_\u001e$\"!a\u0003\t\u0011\u0005-\u0006\u0001\"\u0001\u0005\u0003S\na#[:Xe&$X-\u00115fC\u0012dunZ#oC\ndW\rZ\u0004\t\u0003_\u0013\u0001\u0012\u0001\u0003\u00022\u0006!\"+Z2fSZ,GM\u00117pG.$&/Y2lKJ\u00042aVAZ\r\u001d\t!\u0001#\u0001\u0005\u0003k\u001b2!a-\u000e\u0011\u001d!\u00161\u0017C\u0001\u0003s#\"!!-\t\u0011\u0005u\u00161\u0017C\u0001\u0003\u007f\u000bQc\u00195fG.\u0004x.\u001b8u\t&\u0014Hk\u001c'pO\u0012K'\u000fF\u0002M\u0003\u0003Dq!a1\u0002<\u0002\u0007A*A\u0007dQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d")
public class ReceivedBlockTracker
implements Logging {
    public final SparkConf org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf;
    public final Configuration org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    public final Option<String> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLog> writeAheadLogOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String checkpointDirToLogDir(String string) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(string);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2((Logging)this);
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLog> writeAheadLogOption() {
        return this.writeAheadLogOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time x$1) {
        this.lastAllocatedBatchTime = x$1;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        boolean bl;
        try {
            boolean writeResult = this.writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            if (writeResult) {
                ReceivedBlockTracker receivedBlockTracker = this;
                synchronized (receivedBlockTracker) {
                    this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
                    // MONITOREXIT @DISABLED, blocks:[0, 1, 2, 7] lbl9 : MonitorExitStatement: MONITOREXIT : receivedBlockTracker
                    this.logDebug((Function0<String>)new Serializable(this, receivedBlockInfo){
                        public static final long serialVersionUID = 0L;
                        private final ReceivedBlockInfo receivedBlockInfo$1;

                        public final String apply() {
                            return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Stream ", " received "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.receivedBlockInfo$1.streamId())}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1.blockStoreResult().blockId()}))).toString();
                        }
                        {
                            this.receivedBlockInfo$1 = receivedBlockInfo$1;
                        }
                    });
                }
            } else {
                this.logDebug((Function0<String>)new Serializable(this, receivedBlockInfo){
                    public static final long serialVersionUID = 0L;
                    private final ReceivedBlockInfo receivedBlockInfo$1;

                    public final String apply() {
                        return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to acknowledge stream ", " receiving "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.receivedBlockInfo$1.streamId())}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"block ", " in the Write Ahead Log."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1.blockStoreResult().blockId()}))).toString();
                    }
                    {
                        this.receivedBlockInfo$1 = receivedBlockInfo$1;
                    }
                });
            }
            bl = writeResult;
            return bl;
        }
        catch (Throwable throwable) {
            boolean bl2;
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logError((Function0<String>)new Serializable(this, receivedBlockInfo){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockInfo receivedBlockInfo$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error adding block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1}));
                }
                {
                    this.receivedBlockInfo$1 = receivedBlockInfo$1;
                }
            }, e);
            bl = bl2 = false;
        }
        return bl;
    }

    public synchronized void allocateBlocksToBatch(Time batchTime) {
        if (this.lastAllocatedBatchTime() == null || batchTime.$greater(this.lastAllocatedBatchTime())) {
            Map streamIdToBlocks = ((TraversableOnce)this.streamIds.map((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ReceivedBlockTracker $outer;

                public final Tuple2<Object, scala.collection.mutable.Seq<ReceivedBlockInfo>> apply(int streamId) {
                    return new Tuple2((Object)BoxesRunTime.boxToInteger((int)streamId), (Object)this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).dequeueAll((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final boolean apply(ReceivedBlockInfo x) {
                            return true;
                        }
                    }));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            AllocatedBlocks allocatedBlocks = new AllocatedBlocks((Map<Object, Seq<ReceivedBlockInfo>>)streamIdToBlocks);
            if (this.writeToLog(new BatchAllocationEvent(batchTime, allocatedBlocks))) {
                this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
                this.lastAllocatedBatchTime_$eq(batchTime);
            } else {
                this.logInfo((Function0<String>)new Serializable(this, batchTime){
                    public static final long serialVersionUID = 0L;
                    private final Time batchTime$1;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " needs to be processed again in WAL recovery"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$1}));
                    }
                    {
                        this.batchTime$1 = batchTime$1;
                    }
                });
            }
        } else {
            this.logInfo((Function0<String>)new Serializable(this, batchTime){
                public static final long serialVersionUID = 0L;
                private final Time batchTime$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " needs to be processed again in WAL recovery"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$1}));
                }
                {
                    this.batchTime$1 = batchTime$1;
                }
            });
        }
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time batchTime) {
        return (Map)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Map<Object, Seq<ReceivedBlockInfo>> apply(AllocatedBlocks x$1) {
                return x$1.streamIdToAllocatedBlocks();
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Map<Object, Nothing$> apply() {
                return Predef$.MODULE$.Map().empty();
            }
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time batchTime, int streamId) {
        return (Seq)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1)new Serializable(this, streamId){
            public static final long serialVersionUID = 0L;
            private final int streamId$1;

            public final Seq<ReceivedBlockInfo> apply(AllocatedBlocks x$2) {
                return x$2.getBlocksOfStream(this.streamId$1);
            }
            {
                this.streamId$1 = streamId$1;
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Seq<Nothing$> apply() {
                return (Seq)Seq$.MODULE$.empty();
            }
        });
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !this.streamIdToUnallocatedBlockQueues().values().forall((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Queue<ReceivedBlockInfo> x$3) {
                return x$3.isEmpty();
            }
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int streamId) {
        return this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).toSeq();
    }

    public synchronized void cleanupOldBatches(Time cleanupThreshTime, boolean waitForCompletion) {
        Predef$.MODULE$.require(cleanupThreshTime.milliseconds() < this.clock.getTimeMillis());
        Seq timesToCleanup = ((TraversableOnce)this.timeToAllocatedBlocks().keys().filter((Function1)new Serializable(this, cleanupThreshTime){
            public static final long serialVersionUID = 0L;
            private final Time cleanupThreshTime$1;

            public final boolean apply(Time x$4) {
                return x$4.$less(this.cleanupThreshTime$1);
            }
            {
                this.cleanupThreshTime$1 = cleanupThreshTime$1;
            }
        })).toSeq();
        this.logInfo((Function0<String>)new Serializable(this, timesToCleanup){
            public static final long serialVersionUID = 0L;
            private final Seq timesToCleanup$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Deleting batches: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.timesToCleanup$1.mkString(" ")}));
            }
            {
                this.timesToCleanup$1 = timesToCleanup$1;
            }
        });
        if (this.writeToLog(new BatchCleanupEvent((Seq<Time>)timesToCleanup))) {
            this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)timesToCleanup);
            this.writeAheadLogOption().foreach((Function1)new Serializable(this, cleanupThreshTime, waitForCompletion){
                public static final long serialVersionUID = 0L;
                private final Time cleanupThreshTime$1;
                private final boolean waitForCompletion$1;

                public final void apply(WriteAheadLog x$5) {
                    x$5.clean(this.cleanupThreshTime$1.milliseconds(), this.waitForCompletion$1);
                }
                {
                    this.cleanupThreshTime$1 = cleanupThreshTime$1;
                    this.waitForCompletion$1 = waitForCompletion$1;
                }
            });
        } else {
            this.logWarning((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Failed to acknowledge batch clean up in the Write Ahead Log.";
                }
            });
        }
    }

    public void stop() {
        this.writeAheadLogOption().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(WriteAheadLog x$6) {
                x$6.close();
            }
        });
    }

    private synchronized void recoverPastEvents() {
        this.writeAheadLogOption().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReceivedBlockTracker $outer;

            public final void apply(WriteAheadLog writeAheadLog) {
                this.$outer.logInfo((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$recoverPastEvents$1 $outer;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovering from write ahead logs in ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get()}));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
                ((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(writeAheadLog.readAll()).asScala()).foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$recoverPastEvents$1 $outer;

                    public final void apply(ByteBuffer byteBuffer) {
                        ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent;
                        block5: {
                            block3: {
                                block4: {
                                    block2: {
                                        this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().logInfo((Function0<String>)new Serializable(this, byteBuffer){
                                            public static final long serialVersionUID = 0L;
                                            private final ByteBuffer byteBuffer$1;

                                            public final String apply() {
                                                return new StringBuilder().append((Object)"Recovering record ").append((Object)this.byteBuffer$1).toString();
                                            }
                                            {
                                                this.byteBuffer$1 = byteBuffer$1;
                                            }
                                        });
                                        receivedBlockTrackerLogEvent = (ReceivedBlockTrackerLogEvent)Utils$.MODULE$.deserialize(JavaUtils.bufferToArray((ByteBuffer)byteBuffer), Thread.currentThread().getContextClassLoader());
                                        if (!(receivedBlockTrackerLogEvent instanceof BlockAdditionEvent)) break block2;
                                        BlockAdditionEvent blockAdditionEvent = (BlockAdditionEvent)receivedBlockTrackerLogEvent;
                                        ReceivedBlockInfo receivedBlockInfo = blockAdditionEvent.receivedBlockInfo();
                                        this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(receivedBlockInfo);
                                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                        break block3;
                                    }
                                    if (!(receivedBlockTrackerLogEvent instanceof BatchAllocationEvent)) break block4;
                                    BatchAllocationEvent batchAllocationEvent = (BatchAllocationEvent)receivedBlockTrackerLogEvent;
                                    Time time = batchAllocationEvent.time();
                                    AllocatedBlocks allocatedBlocks = batchAllocationEvent.allocatedBlocks();
                                    this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(time, allocatedBlocks);
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    break block3;
                                }
                                if (!(receivedBlockTrackerLogEvent instanceof BatchCleanupEvent)) break block5;
                                BatchCleanupEvent batchCleanupEvent = (BatchCleanupEvent)receivedBlockTrackerLogEvent;
                                Seq<Time> batchTimes = batchCleanupEvent.times();
                                this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(batchTimes);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }
                            return;
                        }
                        throw new MatchError((Object)receivedBlockTrackerLogEvent);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }

            public /* synthetic */ ReceivedBlockTracker org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public boolean writeToLog(ReceivedBlockTrackerLogEvent record) {
        boolean bl;
        if (!this.isWriteAheadLogEnabled()) return true;
        this.logTrace((Function0<String>)new Serializable(this, record){
            public static final long serialVersionUID = 0L;
            private final ReceivedBlockTrackerLogEvent record$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Writing record: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.record$1}));
            }
            {
                this.record$1 = record$1;
            }
        });
        try {
            ((WriteAheadLog)this.writeAheadLogOption().get()).write(ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)record)), this.clock.getTimeMillis());
            return true;
        }
        catch (Throwable throwable) {
            boolean bl2;
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logWarning((Function0<String>)new Serializable(this, record){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockTrackerLogEvent record$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Exception thrown while writing record: ", " to the WriteAheadLog."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.record$1}));
                }
                {
                    this.record$1 = record$1;
                }
            }, e);
            bl = bl2 = false;
        }
        return bl;
    }

    public Queue<ReceivedBlockInfo> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(int streamId) {
        return (Queue)this.streamIdToUnallocatedBlockQueues().getOrElseUpdate((Object)BoxesRunTime.boxToInteger((int)streamId), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Queue<ReceivedBlockInfo> apply() {
                return new Queue();
            }
        });
    }

    private Option<WriteAheadLog> createWriteAheadLog() {
        return this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReceivedBlockTracker $outer;

            public final WriteAheadLog apply(String checkpointDir) {
                String logDir = ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir((String)this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get());
                return WriteAheadLogUtils$.MODULE$.createLogForDriver(this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf, logDir, this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public boolean isWriteAheadLogEnabled() {
        return this.writeAheadLogOption().nonEmpty();
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        this.logTrace((Function0<String>)new Serializable(this, receivedBlockInfo){
            public static final long serialVersionUID = 0L;
            private final ReceivedBlockInfo receivedBlockInfo$2;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Inserting added block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$2}));
            }
            {
                this.receivedBlockInfo$2 = receivedBlockInfo$2;
            }
        });
        receivedBlockInfo.setBlockIdInvalid();
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(Time batchTime, AllocatedBlocks allocatedBlocks) {
        this.logTrace((Function0<String>)new Serializable(this, batchTime, allocatedBlocks){
            public static final long serialVersionUID = 0L;
            private final Time batchTime$2;
            private final AllocatedBlocks allocatedBlocks$1;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Inserting allocated batch for time ", " to "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$2}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.allocatedBlocks$1.streamIdToAllocatedBlocks()}))).toString();
            }
            {
                this.batchTime$2 = batchTime$2;
                this.allocatedBlocks$1 = allocatedBlocks$1;
            }
        });
        allocatedBlocks.streamIdToAllocatedBlocks().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReceivedBlockTracker $outer;

            public final scala.collection.mutable.Seq<ReceivedBlockInfo> apply(Tuple2<Object, Seq<ReceivedBlockInfo>> x0$1) {
                Tuple2<Object, Seq<ReceivedBlockInfo>> tuple2 = x0$1;
                if (tuple2 != null) {
                    int streamId = tuple2._1$mcI$sp();
                    Seq allocatedBlocksInStream = (Seq)tuple2._2();
                    scala.collection.mutable.Seq seq = this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).dequeueAll((Function1)allocatedBlocksInStream.toSet());
                    return seq;
                }
                throw new MatchError(tuple2);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
        this.lastAllocatedBatchTime_$eq(batchTime);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(Seq batchTimes) {
        this.logTrace((Function0<String>)new Serializable(this, batchTimes){
            public static final long serialVersionUID = 0L;
            private final Seq batchTimes$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Cleaning up batches ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTimes$1}));
            }
            {
                this.batchTimes$1 = batchTimes$1;
            }
        });
        this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)batchTimes);
    }

    public ReceivedBlockTracker(SparkConf conf, Configuration hadoopConf, Seq<Object> streamIds2, Clock clock, boolean recoverFromWriteAheadLog, Option<String> checkpointDirOption) {
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf = conf;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf = hadoopConf;
        this.streamIds = streamIds2;
        this.clock = clock;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption = checkpointDirOption;
        Logging.class.$init$((Logging)this);
        this.streamIdToUnallocatedBlockQueues = new HashMap();
        this.timeToAllocatedBlocks = new HashMap();
        this.writeAheadLogOption = this.createWriteAheadLog();
        this.lastAllocatedBatchTime = null;
        if (recoverFromWriteAheadLog) {
            this.recoverPastEvents();
        }
    }
}

