/*
 * Decompiled with CFR 0.152.
 */
package com.kfyty.loveqq.framework.web.core.request.support;

import com.kfyty.loveqq.framework.core.exception.ResolvableException;
import com.kfyty.loveqq.framework.core.utils.JsonUtil;
import com.kfyty.loveqq.framework.core.utils.NIOUtil;
import com.kfyty.loveqq.framework.web.core.exception.AsyncTimeoutException;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ResponseBodyEmitter {
    public static final AsyncTimeoutException ASYNC_TIMEOUT_EXCEPTION = new AsyncTimeoutException("Async request timed out");
    public static Function<Object, ByteBuf> DEFAULT_CONVERTER = data -> {
        if (data instanceof ByteBuf) {
            return (ByteBuf)data;
        }
        if (data instanceof Boolean || data instanceof Number || data instanceof CharSequence) {
            return NIOUtil.from((CharSequence)data.toString());
        }
        if (data instanceof byte[]) {
            return NIOUtil.from((byte[])((byte[])data));
        }
        return NIOUtil.from((byte[])JsonUtil.toJSONString((Object)data).getBytes(StandardCharsets.UTF_8));
    };
    protected boolean done;
    protected boolean complete;
    protected boolean cancelled;
    protected long timeout;
    protected Throwable failure;
    protected List<Object> earlyData;
    protected Subscriber<? super ByteBuf> s;
    protected Function<Object, ByteBuf> converter;
    protected Runnable timeoutCallback;
    protected Consumer<Throwable> errorCallback;
    protected Runnable completionCallback;

    public ResponseBodyEmitter() {
        this.earlyData = new LinkedList<Object>();
    }

    public ResponseBodyEmitter(long timeout) {
        this.timeout = timeout;
        this.earlyData = new LinkedList<Object>();
    }

    public void setConverter(Function<Object, ByteBuf> converter) {
        this.converter = Objects.requireNonNull(converter);
    }

    public void send(Object data) {
        if (!this.done) {
            if (this.s == null) {
                this.earlyData.add(data);
            } else if (!this.cancelled) {
                Function<Object, ByteBuf> converter = this.converter != null ? this.converter : DEFAULT_CONVERTER;
                this.s.onNext((Object)converter.apply(data));
            }
        }
    }

    public synchronized void completeWithTimeout() {
        if (this.done || this.cancelled) {
            return;
        }
        this.complete = true;
        this.failure = ASYNC_TIMEOUT_EXCEPTION;
        if (this.s == null) {
            return;
        }
        this.done = true;
        if (this.timeoutCallback != null) {
            try {
                this.timeoutCallback.run();
            }
            catch (Throwable e) {
                this.s.onError((Throwable)new AsyncTimeoutException("Async request timed out, nested exception: " + e.getMessage(), e));
                return;
            }
        }
        this.s.onError((Throwable)new AsyncTimeoutException("Async request timed out"));
    }

    public synchronized void completeWithError(Throwable error) {
        if (this.done) {
            return;
        }
        this.complete = true;
        this.failure = error;
        if (this.s == null) {
            return;
        }
        this.done = true;
        if (this.errorCallback != null) {
            try {
                this.errorCallback.accept(error);
            }
            catch (Throwable ex) {
                this.s.onError((Throwable)new ResolvableException(error.getMessage() + ", nested exception: " + ex.getMessage(), ex));
                return;
            }
        }
        this.s.onError(error);
    }

    public synchronized void complete() {
        if (this.done) {
            return;
        }
        this.complete = true;
        if (this.s == null) {
            return;
        }
        this.done = true;
        if (this.completionCallback != null) {
            try {
                this.completionCallback.run();
            }
            catch (Throwable e) {
                this.s.onError(e);
                return;
            }
        }
        if (!this.cancelled) {
            this.s.onComplete();
        }
    }

    public ResponseBodyEmitter onTimeout(Runnable timeoutCallback) {
        this.timeoutCallback = timeoutCallback;
        return this;
    }

    public ResponseBodyEmitter onError(Consumer<Throwable> callback) {
        this.errorCallback = callback;
        return this;
    }

    public ResponseBodyEmitter onCompletion(Runnable callback) {
        this.completionCallback = callback;
        return this;
    }

    public Publisher<ByteBuf> toPublisher() {
        return new ResponseBodyEmitterPublisher();
    }

    protected synchronized void sendEarlyData() {
        if (!this.earlyData.isEmpty()) {
            try {
                for (Object data : this.earlyData) {
                    this.send(data);
                }
            }
            finally {
                this.earlyData.clear();
            }
        }
    }

    public boolean isDone() {
        return this.done;
    }

    public boolean isComplete() {
        return this.complete;
    }

    public boolean isCancelled() {
        return this.cancelled;
    }

    public long getTimeout() {
        return this.timeout;
    }

    private class ResponseBodyEmitterPublisher
    implements Publisher<ByteBuf> {
        public void subscribe(Subscriber<? super ByteBuf> s) {
            ResponseBodyEmitter.this.s = s;
            s.onSubscribe((Subscription)new ResponseBodyEmitterSubscription());
        }
    }

    private class ResponseBodyEmitterSubscription
    implements Subscription {
        public void request(long n) {
            ResponseBodyEmitter.this.sendEarlyData();
            if (ResponseBodyEmitter.this.complete) {
                if (ResponseBodyEmitter.this.failure == null) {
                    ResponseBodyEmitter.this.complete();
                } else if (ResponseBodyEmitter.this.failure == ASYNC_TIMEOUT_EXCEPTION) {
                    ResponseBodyEmitter.this.completeWithTimeout();
                } else {
                    ResponseBodyEmitter.this.completeWithError(ResponseBodyEmitter.this.failure);
                }
            }
        }

        public void cancel() {
            ResponseBodyEmitter.this.cancelled = true;
        }
    }
}

