/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.io.Serializable;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logging;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.scheduler.AllocatedBlocks;
import org.apache.spark.streaming.scheduler.BatchAllocationEvent;
import org.apache.spark.streaming.scheduler.BatchCleanupEvent;
import org.apache.spark.streaming.scheduler.BlockAdditionEvent;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.streaming.scheduler.ReceivedBlockTracker$;
import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction1;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001dg!B\u0001\u0003\u0001\u0011a!\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'O\u0003\u0002\u0004\t\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c2\u0001A\u0007\u0014!\tq\u0011#D\u0001\u0010\u0015\u0005\u0001\u0012!B:dC2\f\u0017B\u0001\n\u0010\u0005\u0019\te.\u001f*fMB\u0011AcF\u0007\u0002+)\u0011aCB\u0001\tS:$XM\u001d8bY&\u0011\u0001$\u0006\u0002\b\u0019><w-\u001b8h\u0011!Q\u0002A!A!\u0002\u0013a\u0012\u0001B2p]\u001a\u001c\u0001\u0001\u0005\u0002\u001e=5\ta!\u0003\u0002 \r\tI1\u000b]1sW\u000e{gN\u001a\u0005\tC\u0001\u0011\t\u0011)A\u0005E\u0005Q\u0001.\u00193p_B\u001cuN\u001c4\u0011\u0005\r:S\"\u0001\u0013\u000b\u0005i)#B\u0001\u0014\t\u0003\u0019A\u0017\rZ8pa&\u0011\u0001\u0006\n\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\t\u0011)\u0002!\u0011!Q\u0001\n-\n\u0011b\u001d;sK\u0006l\u0017\nZ:\u0011\u00071\"tG\u0004\u0002.e9\u0011a&M\u0007\u0002_)\u0011\u0001gG\u0001\u0007yI|w\u000e\u001e \n\u0003AI!aM\b\u0002\u000fA\f7m[1hK&\u0011QG\u000e\u0002\u0004'\u0016\f(BA\u001a\u0010!\tq\u0001(\u0003\u0002:\u001f\t\u0019\u0011J\u001c;\t\u0011m\u0002!\u0011!Q\u0001\nq\nQa\u00197pG.\u0004\"!\u0010!\u000e\u0003yR!a\u0010\u0004\u0002\tU$\u0018\u000e\\\u0005\u0003\u0003z\u0012Qa\u00117pG.D\u0001b\u0011\u0001\u0003\u0002\u0003\u0006I\u0001R\u0001\u0019e\u0016\u001cwN^3s\rJ|Wn\u0016:ji\u0016\f\u0005.Z1e\u0019><\u0007C\u0001\bF\u0013\t1uBA\u0004C_>dW-\u00198\t\u0011!\u0003!\u0011!Q\u0001\n%\u000b1c\u00195fG.\u0004x.\u001b8u\t&\u0014x\n\u001d;j_:\u00042A\u0004&M\u0013\tYuB\u0001\u0004PaRLwN\u001c\t\u0003\u001bFs!AT(\u0011\u00059z\u0011B\u0001)\u0010\u0003\u0019\u0001&/\u001a3fM&\u0011!k\u0015\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005A{\u0001\"B+\u0001\t\u00031\u0016A\u0002\u001fj]&$h\bF\u0004X3j[F,\u00180\u0011\u0005a\u0003Q\"\u0001\u0002\t\u000bi!\u0006\u0019\u0001\u000f\t\u000b\u0005\"\u0006\u0019\u0001\u0012\t\u000b)\"\u0006\u0019A\u0016\t\u000bm\"\u0006\u0019\u0001\u001f\t\u000b\r#\u0006\u0019\u0001#\t\u000b!#\u0006\u0019A%\u0006\t\u0001\u0004A!\u0019\u0002\u0013%\u0016\u001cW-\u001b<fI\ncwnY6Rk\u0016,X\rE\u0002cO&l\u0011a\u0019\u0006\u0003I\u0016\fq!\\;uC\ndWM\u0003\u0002g\u001f\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005!\u001c'!B)vKV,\u0007C\u0001-k\u0013\tY'AA\tSK\u000e,\u0017N^3e\u00052|7m[%oM>Dq!\u001c\u0001C\u0002\u0013%a.\u0001\u0011tiJ,\u0017-\\%e)>,f.\u00197m_\u000e\fG/\u001a3CY>\u001c7.U;fk\u0016\u001cX#A8\u0011\t\t\u0004xG]\u0005\u0003c\u000e\u0014q\u0001S1tQ6\u000b\u0007\u000f\u0005\u0002t?6\t\u0001\u0001\u0003\u0004v\u0001\u0001\u0006Ia\\\u0001\"gR\u0014X-Y7JIR{WK\\1mY>\u001c\u0017\r^3e\u00052|7m[)vKV,7\u000f\t\u0005\bo\u0002\u0011\r\u0011\"\u0003y\u0003U!\u0018.\\3U_\u0006cGn\\2bi\u0016$'\t\\8dWN,\u0012!\u001f\t\u0005EBTh\u0010\u0005\u0002|y6\tA!\u0003\u0002~\t\t!A+[7f!\tAv0C\u0002\u0002\u0002\t\u0011q\"\u00117m_\u000e\fG/\u001a3CY>\u001c7n\u001d\u0005\b\u0003\u000b\u0001\u0001\u0015!\u0003z\u0003Y!\u0018.\\3U_\u0006cGn\\2bi\u0016$'\t\\8dWN\u0004\u0003\"CA\u0005\u0001\t\u0007I\u0011BA\u0006\u0003M9(/\u001b;f\u0003\",\u0017\r\u001a'pO>\u0003H/[8o+\t\ti\u0001\u0005\u0003\u000f\u0015\u0006=\u0001\u0003BA\t\u0003+i!!a\u0005\u000b\u0005}\"\u0011\u0002BA\f\u0003'\u0011Qb\u0016:ji\u0016\f\u0005.Z1e\u0019><\u0007\u0002CA\u000e\u0001\u0001\u0006I!!\u0004\u0002)]\u0014\u0018\u000e^3BQ\u0016\fG\rT8h\u001fB$\u0018n\u001c8!\u0011%\ty\u0002\u0001a\u0001\n\u0013\t\t#\u0001\fmCN$\u0018\t\u001c7pG\u0006$X\r\u001a\"bi\u000eDG+[7f+\u0005Q\b\"CA\u0013\u0001\u0001\u0007I\u0011BA\u0014\u0003ia\u0017m\u001d;BY2|7-\u0019;fI\n\u000bGo\u00195US6,w\fJ3r)\u0011\tI#a\f\u0011\u00079\tY#C\u0002\u0002.=\u0011A!\u00168ji\"I\u0011\u0011GA\u0012\u0003\u0003\u0005\rA_\u0001\u0004q\u0012\n\u0004bBA\u001b\u0001\u0001\u0006KA_\u0001\u0018Y\u0006\u001cH/\u00117m_\u000e\fG/\u001a3CCR\u001c\u0007\u000eV5nK\u0002Bq!!\u000f\u0001\t\u0003\tY$\u0001\u0005bI\u0012\u0014En\\2l)\r!\u0015Q\b\u0005\b\u0003\u007f\t9\u00041\u0001j\u0003E\u0011XmY3jm\u0016$'\t\\8dW&sgm\u001c\u0005\b\u0003\u0007\u0002A\u0011AA#\u0003U\tG\u000e\\8dCR,'\t\\8dWN$vNQ1uG\"$B!!\u000b\u0002H!9\u0011\u0011JA!\u0001\u0004Q\u0018!\u00032bi\u000eDG+[7f\u0011\u001d\ti\u0005\u0001C\u0001\u0003\u001f\n\u0001cZ3u\u00052|7m[:PM\n\u000bGo\u00195\u0015\t\u0005E\u0013\u0011\f\t\u0007\u001b\u0006Ms'a\u0016\n\u0007\u0005U3KA\u0002NCB\u00042\u0001\f\u001bj\u0011\u001d\tI%a\u0013A\u0002iDq!!\u0018\u0001\t\u0003\ty&A\rhKR\u0014En\\2lg>3')\u0019;dQ\u0006sGm\u0015;sK\u0006lGCBA,\u0003C\n\u0019\u0007C\u0004\u0002J\u0005m\u0003\u0019\u0001>\t\u000f\u0005\u0015\u00141\fa\u0001o\u0005A1\u000f\u001e:fC6LE\rC\u0004\u0002j\u0001!\t!a\u001b\u00029!\f7/\u00168bY2|7-\u0019;fIJ+7-Z5wK\u0012\u0014En\\2lgV\tA\tC\u0004\u0002p\u0001!\t!!\u001d\u0002)\u001d,G/\u00168bY2|7-\u0019;fI\ncwnY6t)\u0011\t9&a\u001d\t\u000f\u0005\u0015\u0014Q\u000ea\u0001o!9\u0011q\u000f\u0001\u0005\u0002\u0005e\u0014!E2mK\u0006tW\u000f](mI\n\u000bGo\u00195fgR1\u0011\u0011FA>\u0003\u007fBq!! \u0002v\u0001\u0007!0A\tdY\u0016\fg.\u001e9UQJ,7\u000f\u001b+j[\u0016Dq!!!\u0002v\u0001\u0007A)A\txC&$hi\u001c:D_6\u0004H.\u001a;j_:Dq!!\"\u0001\t\u0003\t9)\u0001\u0003ti>\u0004HCAA\u0015\u0011\u001d\tY\t\u0001C\u0005\u0003\u000f\u000b\u0011C]3d_Z,'\u000fU1ti\u00163XM\u001c;t\u0011!\ty\t\u0001C\u0001\t\u0005E\u0015AC<sSR,Gk\u001c'pOR\u0019A)a%\t\u0011\u0005U\u0015Q\u0012a\u0001\u0003/\u000baA]3d_J$\u0007c\u0001-\u0002\u001a&\u0019\u00111\u0014\u0002\u00039I+7-Z5wK\u0012\u0014En\\2l)J\f7m[3s\u0019><WI^3oi\"9\u0011q\u0014\u0001\u0005\n\u0005\u0005\u0016!F4fiJ+7-Z5wK\u0012\u0014En\\2l#V,W/\u001a\u000b\u0004e\u0006\r\u0006bBA3\u0003;\u0003\ra\u000e\u0005\b\u0003O\u0003A\u0011BAU\u0003M\u0019'/Z1uK^\u0013\u0018\u000e^3BQ\u0016\fG\rT8h)\t\ti\u0001\u0003\u0005\u0002.\u0002!\t\u0001BA6\u0003YI7o\u0016:ji\u0016\f\u0005.Z1e\u0019><WI\\1cY\u0016$w\u0001CAY\u0005!\u0005A!a-\u0002)I+7-Z5wK\u0012\u0014En\\2l)J\f7m[3s!\rA\u0016Q\u0017\u0004\b\u0003\tA\t\u0001BA\\'\r\t),\u0004\u0005\b+\u0006UF\u0011AA^)\t\t\u0019\f\u0003\u0005\u0002@\u0006UF\u0011AAa\u0003U\u0019\u0007.Z2la>Lg\u000e\u001e#jeR{Gj\\4ESJ$2\u0001TAb\u0011\u001d\t)-!0A\u00021\u000bQb\u00195fG.\u0004x.\u001b8u\t&\u0014\b")
public class ReceivedBlockTracker
implements Logging {
    private final SparkConf conf;
    private final Configuration hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    private final Option<String> checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLog> writeAheadLogOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String checkpointDirToLogDir(String string) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(string);
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLog> writeAheadLogOption() {
        return this.writeAheadLogOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time x$1) {
        this.lastAllocatedBatchTime = x$1;
    }

    /*
     * WARNING - void declaration
     */
    public boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        boolean bl;
        try {
            void var3_2;
            boolean writeResult = this.writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            if (writeResult) {
                ReceivedBlockTracker receivedBlockTracker = this;
                synchronized (receivedBlockTracker) {
                    Queue cfr_ignored_0 = (Queue)this.getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
                }
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(17).append("Stream ").append(receivedBlockInfo.streamId()).append(" received ").append(new StringBuilder(6).append("block ").append(receivedBlockInfo.blockStoreResult().blockId()).toString()).toString());
            } else {
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(40).append("Failed to acknowledge stream ").append(receivedBlockInfo.streamId()).append(" receiving ").append(new StringBuilder(30).append("block ").append(receivedBlockInfo.blockStoreResult().blockId()).append(" in the Write Ahead Log.").toString()).toString());
            }
            bl = var3_2;
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(19).append("Error adding block ").append(receivedBlockInfo).toString(), e);
            boolean bl2 = false;
            bl = bl2;
        }
        return bl;
    }

    public synchronized void allocateBlocksToBatch(Time batchTime) {
        if (this.lastAllocatedBatchTime() == null || batchTime.$greater(this.lastAllocatedBatchTime())) {
            Map streamIdToBlocks = ((TraversableOnce)this.streamIds.map((Function1 & Serializable & scala.Serializable)streamId -> ReceivedBlockTracker.$anonfun$allocateBlocksToBatch$1(this, BoxesRunTime.unboxToInt((Object)streamId)), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            AllocatedBlocks allocatedBlocks = new AllocatedBlocks((Map<Object, Seq<ReceivedBlockInfo>>)streamIdToBlocks);
            if (this.writeToLog(new BatchAllocationEvent(batchTime, allocatedBlocks))) {
                this.streamIds.foreach((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$1 -> this.getReceivedBlockQueue(x$1).clear());
                this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
                this.lastAllocatedBatchTime_$eq(batchTime);
            } else {
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(69).append("Possibly processed batch ").append(batchTime).append(" needs to be processed again in WAL recovery").toString());
            }
        } else {
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(69).append("Possibly processed batch ").append(batchTime).append(" needs to be processed again in WAL recovery").toString());
        }
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time batchTime) {
        return (Map)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.streamIdToAllocatedBlocks()).getOrElse((Function0 & Serializable & scala.Serializable)() -> Predef$.MODULE$.Map().empty());
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time batchTime, int streamId) {
        return (Seq)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1 & Serializable & scala.Serializable)x$3 -> x$3.getBlocksOfStream(streamId)).getOrElse((Function0 & Serializable & scala.Serializable)() -> (Seq)Seq$.MODULE$.empty());
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !this.streamIdToUnallocatedBlockQueues().values().forall((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)x$4.isEmpty()));
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int streamId) {
        return this.getReceivedBlockQueue(streamId).toSeq();
    }

    public synchronized void cleanupOldBatches(Time cleanupThreshTime, boolean waitForCompletion) {
        Predef$.MODULE$.require(cleanupThreshTime.milliseconds() < this.clock.getTimeMillis());
        Seq timesToCleanup = ((TraversableOnce)this.timeToAllocatedBlocks().keys().filter((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToBoolean((boolean)x$5.$less(cleanupThreshTime)))).toSeq();
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Deleting batches: ").append(timesToCleanup.mkString(" ")).toString());
        if (this.writeToLog(new BatchCleanupEvent((Seq<Time>)timesToCleanup))) {
            this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)timesToCleanup);
            this.writeAheadLogOption().foreach((Function1 & Serializable & scala.Serializable)x$6 -> {
                x$6.clean(cleanupThreshTime.milliseconds(), waitForCompletion);
                return BoxedUnit.UNIT;
            });
        } else {
            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to acknowledge batch clean up in the Write Ahead Log.");
        }
    }

    public void stop() {
        this.writeAheadLogOption().foreach((Function1 & Serializable & scala.Serializable)x$7 -> {
            x$7.close();
            return BoxedUnit.UNIT;
        });
    }

    private synchronized void recoverPastEvents() {
        this.writeAheadLogOption().foreach((Function1 & Serializable & scala.Serializable)writeAheadLog -> {
            ReceivedBlockTracker.$anonfun$recoverPastEvents$5(this, writeAheadLog);
            return BoxedUnit.UNIT;
        });
    }

    public boolean writeToLog(ReceivedBlockTrackerLogEvent record) {
        boolean bl;
        if (this.isWriteAheadLogEnabled()) {
            this.logTrace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(16).append("Writing record: ").append(record).toString());
            try {
                ((WriteAheadLog)this.writeAheadLogOption().get()).write(ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)record)), this.clock.getTimeMillis());
                bl = true;
            }
            catch (Throwable throwable) {
                Throwable throwable2 = throwable;
                Option option = NonFatal$.MODULE$.unapply(throwable2);
                if (option.isEmpty()) {
                    throw throwable;
                }
                Throwable e = (Throwable)option.get();
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(61).append("Exception thrown while writing record: ").append(record).append(" to the WriteAheadLog.").toString(), e);
                boolean bl2 = false;
                bl = bl2;
            }
        } else {
            bl = true;
        }
        return bl;
    }

    private Queue<ReceivedBlockInfo> getReceivedBlockQueue(int streamId) {
        return (Queue)this.streamIdToUnallocatedBlockQueues().getOrElseUpdate((Object)BoxesRunTime.boxToInteger((int)streamId), (Function0 & Serializable & scala.Serializable)() -> new Queue());
    }

    private Option<WriteAheadLog> createWriteAheadLog() {
        return this.checkpointDirOption.map((Function1 & Serializable & scala.Serializable)checkpointDir -> {
            String logDir = ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir((String)$this.checkpointDirOption.get());
            return WriteAheadLogUtils$.MODULE$.createLogForDriver($this.conf, logDir, $this.hadoopConf);
        });
    }

    public boolean isWriteAheadLogEnabled() {
        return this.writeAheadLogOption().nonEmpty();
    }

    public static final /* synthetic */ Tuple2 $anonfun$allocateBlocksToBatch$1(ReceivedBlockTracker $this, int streamId) {
        return new Tuple2((Object)BoxesRunTime.boxToInteger((int)streamId), (Object)$this.getReceivedBlockQueue(streamId).clone());
    }

    private final void insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        this.logTrace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(32).append("Recovery: Inserting added block ").append(receivedBlockInfo).toString());
        receivedBlockInfo.setBlockIdInvalid();
        this.getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
    }

    private final void insertAllocatedBatch$1(Time batchTime, AllocatedBlocks allocatedBlocks) {
        this.logTrace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("Recovery: Inserting allocated batch for time ").append(batchTime).append(" to ").append(String.valueOf(allocatedBlocks.streamIdToAllocatedBlocks())).toString());
        allocatedBlocks.streamIdToAllocatedBlocks().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            int streamId = tuple2._1$mcI$sp();
            Seq allocatedBlocksInStream = (Seq)tuple2._2();
            scala.collection.mutable.Seq seq = this.getReceivedBlockQueue(streamId).dequeueAll((Function1)allocatedBlocksInStream.toSet());
            return seq;
        });
        this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
        this.lastAllocatedBatchTime_$eq(batchTime);
    }

    private final void cleanupBatches$1(Seq batchTimes) {
        this.logTrace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Recovery: Cleaning up batches ").append(batchTimes).toString());
        this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)batchTimes);
    }

    public static final /* synthetic */ void $anonfun$recoverPastEvents$7(ReceivedBlockTracker $this, ByteBuffer byteBuffer) {
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Recovering record ").append(byteBuffer).toString());
        ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent = (ReceivedBlockTrackerLogEvent)Utils$.MODULE$.deserialize(JavaUtils.bufferToArray((ByteBuffer)byteBuffer), Thread.currentThread().getContextClassLoader());
        if (receivedBlockTrackerLogEvent instanceof BlockAdditionEvent) {
            BlockAdditionEvent blockAdditionEvent = (BlockAdditionEvent)receivedBlockTrackerLogEvent;
            ReceivedBlockInfo receivedBlockInfo = blockAdditionEvent.receivedBlockInfo();
            $this.insertAddedBlock$1(receivedBlockInfo);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (receivedBlockTrackerLogEvent instanceof BatchAllocationEvent) {
            BatchAllocationEvent batchAllocationEvent = (BatchAllocationEvent)receivedBlockTrackerLogEvent;
            Time time = batchAllocationEvent.time();
            AllocatedBlocks allocatedBlocks = batchAllocationEvent.allocatedBlocks();
            $this.insertAllocatedBatch$1(time, allocatedBlocks);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (receivedBlockTrackerLogEvent instanceof BatchCleanupEvent) {
            BatchCleanupEvent batchCleanupEvent = (BatchCleanupEvent)receivedBlockTrackerLogEvent;
            Seq<Time> batchTimes = batchCleanupEvent.times();
            $this.cleanupBatches$1(batchTimes);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)receivedBlockTrackerLogEvent);
        }
    }

    public static final /* synthetic */ void $anonfun$recoverPastEvents$5(ReceivedBlockTracker $this, WriteAheadLog writeAheadLog) {
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(36).append("Recovering from write ahead logs in ").append($this.checkpointDirOption.get()).toString());
        ((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(writeAheadLog.readAll()).asScala()).foreach((Function1 & Serializable & scala.Serializable)byteBuffer -> {
            ReceivedBlockTracker.$anonfun$recoverPastEvents$7($this, byteBuffer);
            return BoxedUnit.UNIT;
        });
    }

    public ReceivedBlockTracker(SparkConf conf, Configuration hadoopConf, Seq<Object> streamIds, Clock clock, boolean recoverFromWriteAheadLog, Option<String> checkpointDirOption) {
        block0: {
            this.conf = conf;
            this.hadoopConf = hadoopConf;
            this.streamIds = streamIds;
            this.clock = clock;
            this.checkpointDirOption = checkpointDirOption;
            Logging.$init$((Logging)this);
            this.streamIdToUnallocatedBlockQueues = new HashMap();
            this.timeToAllocatedBlocks = new HashMap();
            this.writeAheadLogOption = this.createWriteAheadLog();
            this.lastAllocatedBatchTime = null;
            if (!recoverFromWriteAheadLog) break block0;
            this.recoverPastEvents();
        }
    }
}

