/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis;

import com.lambdaworks.redis.BackpressureUtils;
import com.lambdaworks.redis.RedisCommandExecutionException;
import com.lambdaworks.redis.api.StatefulConnection;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.internal.LettuceAssert;
import com.lambdaworks.redis.output.StreamingOutput;
import com.lambdaworks.redis.protocol.CommandWrapper;
import com.lambdaworks.redis.protocol.RedisCommand;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Operators;

class RedisPublisher<K, V, T>
implements Publisher<T> {
    private static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);
    private final boolean traceEnabled = LOG.isTraceEnabled();
    private final Supplier<? extends RedisCommand<K, V, T>> commandSupplier;
    private final AtomicReference<RedisCommand<K, V, T>> ref;
    private final StatefulConnection<K, V> connection;
    private final boolean dissolve;

    public RedisPublisher(RedisCommand<K, V, T> staticCommand, StatefulConnection<K, V> connection, boolean dissolve) {
        this(() -> staticCommand, connection, dissolve);
    }

    public RedisPublisher(Supplier<RedisCommand<K, V, T>> commandSupplier, StatefulConnection<K, V> connection, boolean dissolve) {
        LettuceAssert.notNull(commandSupplier, "CommandSupplier must not be null");
        LettuceAssert.notNull(connection, "StatefulConnection must not be null");
        this.commandSupplier = commandSupplier;
        this.connection = connection;
        this.dissolve = dissolve;
        this.ref = new AtomicReference<RedisCommand<K, RedisCommand<K, V, T>, T>>(commandSupplier.get());
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        RedisCommand<K, V, T> command;
        if (this.traceEnabled) {
            LOG.trace("subscribe: {}@{}", (Object)subscriber.getClass().getName(), (Object)Objects.hashCode(subscriber));
        }
        if ((command = this.ref.get()) != null) {
            if (!this.ref.compareAndSet(command, null)) {
                command = this.commandSupplier.get();
            }
        } else {
            command = this.commandSupplier.get();
        }
        RedisSubscription<T> redisSubscription = new RedisSubscription<T>(this.connection, command, this.dissolve);
        redisSubscription.subscribe(subscriber);
    }

    private static class CompositeSubscriber<T>
    implements StreamingOutput.Subscriber<T> {
        private final Collection<StreamingOutput.Subscriber<T>> subscribers;

        CompositeSubscriber(Collection<StreamingOutput.Subscriber<T>> subscribers) {
            this.subscribers = subscribers;
        }

        @Override
        public void onNext(T t) {
            this.subscribers.forEach(subscriber -> subscriber.onNext(t));
        }
    }

    private static class SubscriptionCommand<K, V, T>
    extends CommandWrapper<K, V, T> {
        private final boolean dissolve;
        private final RedisSubscription<T> subscription;
        private boolean completed = false;

        public SubscriptionCommand(RedisCommand<K, V, T> command, RedisSubscription<T> subscription, boolean dissolve) {
            super(command);
            this.subscription = subscription;
            this.dissolve = dissolve;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void complete() {
            if (this.completed) {
                return;
            }
            try {
                super.complete();
                if (this.getOutput() != null) {
                    Object result = this.getOutput().get();
                    if (this.getOutput().hasError()) {
                        this.onError(new RedisCommandExecutionException(this.getOutput().getError()));
                        this.completed = true;
                        return;
                    }
                    if (!(this.getOutput() instanceof StreamingOutput) && result != null) {
                        if (this.dissolve && result instanceof Collection) {
                            Collection collection = (Collection)result;
                            for (Object t : collection) {
                                if (t == null) continue;
                                this.subscription.onNext(t);
                            }
                        } else {
                            this.subscription.onNext(result);
                        }
                    }
                }
                this.subscription.onAllDataRead();
            }
            finally {
                this.completed = true;
            }
        }

        @Override
        public void cancel() {
            if (this.completed) {
                return;
            }
            super.cancel();
            this.completed = true;
        }

        @Override
        public boolean completeExceptionally(Throwable throwable) {
            if (this.completed) {
                return false;
            }
            boolean b = super.completeExceptionally(throwable);
            this.onError(throwable);
            this.completed = true;
            return b;
        }

        private void onError(Throwable throwable) {
            this.subscription.onError(throwable);
        }
    }

    private static enum State {
        UNSUBSCRIBED{

            @Override
            void subscribe(RedisSubscription<?> subscription, Subscriber<?> subscriber) {
                LettuceAssert.notNull(subscriber, "Subscriber must not be null");
                if (!((RedisSubscription)subscription).changeState(this, 1.NO_DEMAND)) {
                    throw new IllegalStateException(this.toString());
                }
                ((RedisSubscription)subscription).subscriber = subscriber;
                subscriber.onSubscribe(subscription);
            }
        }
        ,
        NO_DEMAND{

            @Override
            void request(RedisSubscription<?> subscription, long n) {
                if (Operators.checkRequest((long)n, (Subscriber)((RedisSubscription)subscription).subscriber)) {
                    Operators.addAndGet((AtomicLong)((RedisSubscription)subscription).demand, (long)n);
                    if (((RedisSubscription)subscription).changeState(this, 2.DEMAND)) {
                        try {
                            subscription.checkCommandDispatch();
                        }
                        catch (Exception ex) {
                            subscription.onError(ex);
                        }
                        subscription.checkOnDataAvailable();
                    }
                }
            }
        }
        ,
        DEMAND{

            @Override
            void onDataAvailable(RedisSubscription<?> subscription) {
                if (((RedisSubscription)subscription).changeState(this, 3.READING)) {
                    try {
                        boolean demandAvailable = ((RedisSubscription)subscription).readAndPublish();
                        if (demandAvailable) {
                            ((RedisSubscription)subscription).changeState(3.READING, 3.DEMAND);
                            subscription.checkOnDataAvailable();
                        } else if (((RedisSubscription)subscription).allDataRead && ((RedisSubscription)subscription).data.isEmpty()) {
                            subscription.onAllDataRead();
                        } else {
                            ((RedisSubscription)subscription).changeState(3.READING, 3.NO_DEMAND);
                        }
                    }
                    catch (IOException ex) {
                        subscription.onError(ex);
                    }
                }
            }

            @Override
            void request(RedisSubscription<?> subscription, long n) {
                if (Operators.checkRequest((long)n, (Subscriber)((RedisSubscription)subscription).subscriber)) {
                    Operators.addAndGet((AtomicLong)((RedisSubscription)subscription).demand, (long)n);
                }
            }
        }
        ,
        READING{

            @Override
            void request(RedisSubscription<?> subscription, long n) {
                if (Operators.checkRequest((long)n, (Subscriber)((RedisSubscription)subscription).subscriber)) {
                    Operators.addAndGet((AtomicLong)((RedisSubscription)subscription).demand, (long)n);
                }
            }
        }
        ,
        COMPLETED{

            @Override
            void request(RedisSubscription<?> subscription, long n) {
            }

            @Override
            void cancel(RedisSubscription<?> subscription) {
            }

            @Override
            void onAllDataRead(RedisSubscription<?> subscription) {
            }

            @Override
            void onError(RedisSubscription<?> subscription, Throwable t) {
            }
        };


        void subscribe(RedisSubscription<?> subscription, Subscriber<?> subscriber) {
            throw new IllegalStateException(this.toString());
        }

        void request(RedisSubscription<?> subscription, long n) {
            throw new IllegalStateException(this.toString());
        }

        void cancel(RedisSubscription<?> subscription) {
            ((RedisSubscription)subscription).command.cancel();
            ((RedisSubscription)subscription).changeState(this, State.COMPLETED);
        }

        void onDataAvailable(RedisSubscription<?> subscription) {
        }

        void onAllDataRead(RedisSubscription<?> subscription) {
            ((RedisSubscription)subscription).allDataRead = true;
            if (((RedisSubscription)subscription).data.isEmpty() && ((RedisSubscription)subscription).changeState(this, State.COMPLETED) && ((RedisSubscription)subscription).subscriber != null) {
                ((RedisSubscription)subscription).subscriber.onComplete();
            }
        }

        void onError(RedisSubscription<?> subscription, Throwable t) {
            if (((RedisSubscription)subscription).changeState(this, State.COMPLETED) && ((RedisSubscription)subscription).subscriber != null) {
                ((RedisSubscription)subscription).subscriber.onError(t);
            }
        }
    }

    private static class RedisSubscription<T>
    implements Subscription,
    StreamingOutput.Subscriber<T> {
        private static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);
        private final boolean traceEnabled = LOG.isTraceEnabled();
        private final AtomicLong demand = new AtomicLong();
        private final Queue<T> data = new ConcurrentLinkedQueue<T>();
        private final AtomicBoolean dispatched = new AtomicBoolean();
        private volatile boolean allDataRead = false;
        private final StatefulConnection<?, ?> connection;
        private final RedisCommand<?, ?, T> command;
        private final boolean dissolve;
        private Subscriber<? super T> subscriber;
        private final AtomicReference<State> state = new AtomicReference<State>(State.UNSUBSCRIBED);

        RedisSubscription(StatefulConnection<?, ?> connection, RedisCommand<?, ?, T> command, boolean dissolve) {
            LettuceAssert.notNull(connection, "Connection must not be null");
            LettuceAssert.notNull(command, "RedisCommand must not be null");
            this.connection = connection;
            this.command = command;
            this.dissolve = dissolve;
        }

        void subscribe(Subscriber<? super T> subscriber) {
            if (subscriber == null) {
                throw new NullPointerException("Subscriber must not be null");
            }
            if (this.traceEnabled) {
                LOG.trace("{} subscribe: {}@{}", new Object[]{this.state(), subscriber.getClass().getName(), Objects.hashCode(subscriber)});
            }
            this.state().subscribe(this, subscriber);
        }

        public final void request(long n) {
            if (this.traceEnabled) {
                LOG.trace("{} request: {}", (Object)this.state(), (Object)n);
            }
            this.state().request(this, n);
        }

        public final void cancel() {
            if (this.traceEnabled) {
                LOG.trace("{} cancel", (Object)this.state());
            }
            this.state().cancel(this);
        }

        private State state() {
            return this.state.get();
        }

        @Override
        public void onNext(T t) {
            LettuceAssert.notNull(t, "Data must not be null");
            this.data.add(t);
            this.onDataAvailable();
        }

        final void onDataAvailable() {
            if (this.traceEnabled) {
                LOG.trace("{} onDataAvailable()", (Object)this.state());
            }
            this.state.get().onDataAvailable(this);
        }

        final void onAllDataRead() {
            if (this.traceEnabled) {
                LOG.trace("{} onAllDataRead()", (Object)this.state());
            }
            this.allDataRead = true;
            this.state.get().onAllDataRead(this);
        }

        final void onError(Throwable t) {
            if (LOG.isErrorEnabled()) {
                LOG.trace("{} onError(): {}", new Object[]{this.state(), t.toString(), t});
            }
            this.state.get().onError(this, t);
        }

        private boolean readAndPublish() throws IOException {
            while (this.hasDemand()) {
                T data = this.read();
                if (data != null) {
                    BackpressureUtils.getAndSub(this.demand, 1L);
                    this.subscriber.onNext(data);
                    continue;
                }
                return true;
            }
            return false;
        }

        protected T read() {
            return this.data.poll();
        }

        private boolean hasDemand() {
            return this.demand.get() > 0L;
        }

        private boolean changeState(State oldState, State newState) {
            return this.state.compareAndSet(oldState, newState);
        }

        void checkCommandDispatch() {
            if (!this.dispatched.get() && this.dispatched.compareAndSet(false, true)) {
                this.dispatchCommand();
            }
        }

        private void dispatchCommand() {
            if (this.command.getOutput() instanceof StreamingOutput) {
                StreamingOutput streamingOutput = (StreamingOutput)((Object)this.command.getOutput());
                if (this.connection instanceof StatefulRedisConnection && ((StatefulRedisConnection)this.connection).isMulti()) {
                    streamingOutput.setSubscriber(new CompositeSubscriber(Arrays.asList(this, streamingOutput.getSubscriber())));
                } else {
                    streamingOutput.setSubscriber(this);
                }
            }
            this.connection.dispatch(new SubscriptionCommand(this.command, this, this.dissolve));
        }

        void checkOnDataAvailable() {
            if (!this.data.isEmpty()) {
                this.onDataAvailable();
            }
        }
    }
}

