/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.dstream;

import java.io.InputStream;
import java.io.Serializable;
import java.net.ConnectException;
import java.net.Socket;
import org.apache.spark.internal.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.dstream.SocketReceiver$;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.collection.Iterator;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001dc!B\u0001\u0003\u0001\u0011a!AD*pG.,GOU3dK&4XM\u001d\u0006\u0003\u0007\u0011\tq\u0001Z:ue\u0016\fWN\u0003\u0002\u0006\r\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0011aA8sOV\u0011QBF\n\u0004\u00019\u0019\u0003cA\b\u0013)5\t\u0001C\u0003\u0002\u0012\t\u0005A!/Z2fSZ,'/\u0003\u0002\u0014!\tA!+Z2fSZ,'\u000f\u0005\u0002\u0016-1\u0001A!B\f\u0001\u0005\u0004I\"!\u0001+\u0004\u0001E\u0011!\u0004\t\t\u00037yi\u0011\u0001\b\u0006\u0002;\u0005)1oY1mC&\u0011q\u0004\b\u0002\b\u001d>$\b.\u001b8h!\tY\u0012%\u0003\u0002#9\t\u0019\u0011I\\=\u0011\u0005\u0011:S\"A\u0013\u000b\u0005\u00192\u0011\u0001C5oi\u0016\u0014h.\u00197\n\u0005!*#a\u0002'pO\u001eLgn\u001a\u0005\tU\u0001\u0011\t\u0011)A\u0005W\u0005!\u0001n\\:u!\ta3G\u0004\u0002.cA\u0011a\u0006H\u0007\u0002_)\u0011\u0001\u0007G\u0001\u0007yI|w\u000e\u001e \n\u0005Ib\u0012A\u0002)sK\u0012,g-\u0003\u00025k\t11\u000b\u001e:j]\u001eT!A\r\u000f\t\u0011]\u0002!\u0011!Q\u0001\na\nA\u0001]8siB\u00111$O\u0005\u0003uq\u00111!\u00138u\u0011!a\u0004A!A!\u0002\u0013i\u0014A\u00042zi\u0016\u001cHk\\(cU\u0016\u001cGo\u001d\t\u00057y\u0002\u0005*\u0003\u0002@9\tIa)\u001e8di&|g.\r\t\u0003\u0003\u001ak\u0011A\u0011\u0006\u0003\u0007\u0012\u000b!![8\u000b\u0003\u0015\u000bAA[1wC&\u0011qI\u0011\u0002\f\u0013:\u0004X\u000f^*ue\u0016\fW\u000eE\u0002J\u001dRq!A\u0013'\u000f\u00059Z\u0015\"A\u000f\n\u00055c\u0012a\u00029bG.\fw-Z\u0005\u0003\u001fB\u0013\u0001\"\u0013;fe\u0006$xN\u001d\u0006\u0003\u001brA\u0011B\u0015\u0001\u0003\u0002\u0003\u0006IaU-\u0002\u0019M$xN]1hK2+g/\u001a7\u0011\u0005Q;V\"A+\u000b\u0005Y3\u0011aB:u_J\fw-Z\u0005\u00031V\u0013Ab\u0015;pe\u0006<W\rT3wK2L!A\u0015\n\t\u0011m\u0003!1!Q\u0001\fq\u000b!\"\u001a<jI\u0016t7-\u001a\u00133!\ri\u0006\rF\u0007\u0002=*\u0011q\fH\u0001\be\u00164G.Z2u\u0013\t\tgL\u0001\u0005DY\u0006\u001c8\u000fV1h\u0011\u0015\u0019\u0007\u0001\"\u0001e\u0003\u0019a\u0014N\\5u}Q)Q-\u001b6lYR\u0011a\r\u001b\t\u0004O\u0002!R\"\u0001\u0002\t\u000bm\u0013\u00079\u0001/\t\u000b)\u0012\u0007\u0019A\u0016\t\u000b]\u0012\u0007\u0019\u0001\u001d\t\u000bq\u0012\u0007\u0019A\u001f\t\u000bI\u0013\u0007\u0019A*\t\u00139\u0004\u0001\u0019!a\u0001\n\u0013y\u0017AB:pG.,G/F\u0001q!\t\tH/D\u0001s\u0015\t\u0019H)A\u0002oKRL!!\u001e:\u0003\rM{7m[3u\u0011%9\b\u00011AA\u0002\u0013%\u00010\u0001\u0006t_\u000e\\W\r^0%KF$\"!\u001f?\u0011\u0005mQ\u0018BA>\u001d\u0005\u0011)f.\u001b;\t\u000fu4\u0018\u0011!a\u0001a\u0006\u0019\u0001\u0010J\u0019\t\u0013}\u0004\u0001\u0019!A!B\u0013\u0001\u0018aB:pG.,G\u000f\t\u0005\b\u0003\u0007\u0001A\u0011AA\u0003\u0003\u001dygn\u0015;beR$\u0012!\u001f\u0005\b\u0003\u0013\u0001A\u0011AA\u0003\u0003\u0019ygn\u0015;pa\"9\u0011Q\u0002\u0001\u0005\u0002\u0005\u0015\u0011a\u0002:fG\u0016Lg/Z\u0004\t\u0003#\u0011\u0001\u0012\u0001\u0003\u0002\u0014\u0005q1k\\2lKR\u0014VmY3jm\u0016\u0014\bcA4\u0002\u0016\u00199\u0011A\u0001E\u0001\t\u0005]1CBA\u000b\u00033\ty\u0002E\u0002\u001c\u00037I1!!\b\u001d\u0005\u0019\te.\u001f*fMB\u00191$!\t\n\u0007\u0005\rBD\u0001\u0007TKJL\u0017\r\\5{C\ndW\rC\u0004d\u0003+!\t!a\n\u0015\u0005\u0005M\u0001\u0002CA\u0016\u0003+!\t!!\f\u0002\u0019\tLH/Z:U_2Kg.Z:\u0015\t\u0005=\u0012\u0011\u0007\t\u0004\u0013:[\u0003bBA\u001a\u0003S\u0001\r\u0001Q\u0001\fS:\u0004X\u000f^*ue\u0016\fW\u000e\u0003\u0006\u00028\u0005U\u0011\u0011!C\u0005\u0003s\t1B]3bIJ+7o\u001c7wKR\u0011\u00111\b\t\u0005\u0003{\t\u0019%\u0004\u0002\u0002@)\u0019\u0011\u0011\t#\u0002\t1\fgnZ\u0005\u0005\u0003\u000b\nyD\u0001\u0004PE*,7\r\u001e")
public class SocketReceiver<T>
extends Receiver<T>
implements Logging {
    private final String host;
    private final int port;
    private final Function1<InputStream, Iterator<T>> bytesToObjects;
    private Socket socket;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static Iterator<String> bytesToLines(InputStream inputStream) {
        return SocketReceiver$.MODULE$.bytesToLines(inputStream);
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Socket socket() {
        return this.socket;
    }

    private void socket_$eq(Socket x$1) {
        this.socket = x$1;
    }

    @Override
    public void onStart() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(15).append("Connecting to ").append($this.host).append(":").append($this.port).toString());
        try {
            this.socket_$eq(new Socket(this.host, this.port));
        }
        catch (ConnectException e) {
            this.restart(new StringBuilder(21).append("Error connecting to ").append(this.host).append(":").append(this.port).toString(), e);
            return;
        }
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(14).append("Connected to ").append($this.host).append(":").append($this.port).toString());
        new Thread(this){
            private final /* synthetic */ SocketReceiver $outer;

            public void run() {
                this.$outer.receive();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super("Socket Receiver");
                this.setDaemon(true);
            }
        }.start();
    }

    @Override
    public synchronized void onStop() {
        block0: {
            if (this.socket() == null) break block0;
            this.socket().close();
            this.socket_$eq(null);
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(18).append("Closed socket to ").append($this.host).append(":").append($this.port).toString());
        }
    }

    public void receive() {
        block9: {
            try {
                try {
                    Iterator iterator = (Iterator)this.bytesToObjects.apply((Object)this.socket().getInputStream());
                    while (!this.isStopped() && iterator.hasNext()) {
                        this.store(iterator.next());
                    }
                    if (!this.isStopped()) {
                        this.restart("Socket data stream had no more data");
                    } else {
                        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Stopped receiving");
                    }
                }
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    Option option = NonFatal$.MODULE$.unapply(throwable2);
                    if (!option.isEmpty()) {
                        Throwable e = (Throwable)option.get();
                        this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error receiving data", e);
                        this.restart("Error receiving data", e);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        break block9;
                    }
                    throw throwable;
                }
            }
            finally {
                this.onStop();
            }
        }
    }

    public SocketReceiver(String host, int port, Function1<InputStream, Iterator<T>> bytesToObjects, StorageLevel storageLevel, ClassTag<T> evidence$2) {
        this.host = host;
        this.port = port;
        this.bytesToObjects = bytesToObjects;
        super(storageLevel);
        Logging.$init$((Logging)this);
    }
}

