/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.net.http.textstream;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import org.noear.solon.core.util.RunUtil;
import org.noear.solon.net.http.textstream.ServerSentEvent;
import org.noear.solon.rx.SimpleSubscription;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class TextStreamUtil {
    @Deprecated
    public static void parseTextStream(InputStream inputStream, Subscriber<? super String> subscriber) throws IOException {
        TextStreamUtil.parseLineStream(inputStream, subscriber);
    }

    public static void parseLineStream(InputStream inputStream, Subscriber<? super String> subscriber) throws IOException {
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream), 1024);
        subscriber.onSubscribe((Subscription)new SimpleSubscription().onRequest((subscription, l) -> TextStreamUtil.onLineStreamRequestDo(reader, subscriber, subscription, l)));
    }

    private static void onLineStreamRequestDo(BufferedReader reader, Subscriber<? super String> subscriber, SimpleSubscription subscription, long l) {
        try {
            while (l > 0L) {
                if (subscription.isCancelled()) {
                    RunUtil.runAndTry(reader::close);
                    return;
                }
                String textLine = reader.readLine();
                if (textLine == null) break;
                subscriber.onNext((Object)textLine);
                --l;
            }
            subscriber.onComplete();
            reader.close();
        }
        catch (Throwable err) {
            RunUtil.runAndTry(reader::close);
            subscriber.onError(err);
        }
    }

    @Deprecated
    public static void parseEventStream(InputStream inputStream, Subscriber<? super ServerSentEvent> subscriber) throws IOException {
        TextStreamUtil.parseSseStream(inputStream, subscriber);
    }

    public static void parseSseStream(InputStream inputStream, Subscriber<? super ServerSentEvent> subscriber) throws IOException {
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream), 1024);
        subscriber.onSubscribe((Subscription)new SimpleSubscription().onRequest((subscription, l) -> TextStreamUtil.onSseStreamRequestDo(reader, subscriber, subscription, l)));
    }

    private static void onSseStreamRequestDo(BufferedReader reader, Subscriber<? super ServerSentEvent> subscriber, SimpleSubscription subscription, long l) {
        try {
            HashMap<String, String> meta = new HashMap<String, String>();
            StringBuilder data = new StringBuilder();
            while (l > 0L) {
                if (subscription.isCancelled()) {
                    RunUtil.runAndTry(reader::close);
                    return;
                }
                String textLine = reader.readLine();
                if (textLine == null) break;
                if (textLine.isEmpty()) {
                    if (data.length() <= 0) continue;
                    subscriber.onNext((Object)new ServerSentEvent(meta, data.toString()));
                    --l;
                    meta.clear();
                    data.setLength(0);
                    continue;
                }
                if (textLine.startsWith("data:")) {
                    String content = textLine.substring("data:".length());
                    if (data.length() > 0) {
                        data.append("\n");
                    }
                    data.append(content.trim());
                    continue;
                }
                int flagIdx = textLine.indexOf(58);
                if (flagIdx <= 0) continue;
                meta.put(textLine.substring(0, flagIdx).trim(), textLine.substring(flagIdx + 1).trim());
            }
            subscriber.onComplete();
            reader.close();
        }
        catch (Throwable err) {
            RunUtil.runAndTry(reader::close);
            subscriber.onError(err);
        }
    }
}

