/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.internal.LettuceAssert;
import com.lambdaworks.redis.internal.LettuceFactories;
import com.lambdaworks.redis.internal.LettuceSets;
import com.lambdaworks.redis.output.CommandOutput;
import com.lambdaworks.redis.protocol.ChannelLogDescriptor;
import com.lambdaworks.redis.protocol.CommandWrapper;
import com.lambdaworks.redis.protocol.Endpoint;
import com.lambdaworks.redis.protocol.HasQueuedCommands;
import com.lambdaworks.redis.protocol.ProtocolKeyword;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import com.lambdaworks.redis.protocol.WithLatency;
import com.lambdaworks.redis.resource.ClientResources;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.local.LocalAddress;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class CommandHandler
extends ChannelDuplexHandler
implements HasQueuedCommands {
    static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = LettuceSets.unmodifiableSet("Connection reset by peer", "Broken pipe", "Connection timed out");
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
    private static final AtomicLong COMMAND_HANDLER_COUNTER = new AtomicLong();
    private final long commandHandlerId = COMMAND_HANDLER_COUNTER.incrementAndGet();
    private final Queue<RedisCommand<?, ?, ?>> queue = LettuceFactories.newConcurrentQueue();
    private final RedisStateMachine rsm = new RedisStateMachine();
    private final boolean traceEnabled = logger.isTraceEnabled();
    private final boolean debugEnabled = logger.isDebugEnabled();
    private final ClientResources clientResources;
    private final Endpoint endpoint;
    private Channel channel;
    private ByteBuf buffer;
    private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED;
    private String logPrefix;

    public CommandHandler(ClientResources clientResources, Endpoint endpoint) {
        LettuceAssert.notNull(clientResources, "ClientResources must not be null");
        LettuceAssert.notNull(endpoint, "RedisEndpoint must not be null");
        this.clientResources = clientResources;
        this.endpoint = endpoint;
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.isClosed()) {
            logger.debug("{} Dropping register for a closed channel", (Object)this.logPrefix());
        }
        this.channel = ctx.channel();
        if (this.debugEnabled) {
            this.logPrefix = null;
            logger.debug("{} channelRegistered()", (Object)this.logPrefix());
        }
        this.setState(LifecycleState.REGISTERED);
        this.endpoint.registerQueue(this);
        this.buffer = ctx.alloc().directBuffer(65536);
        ctx.fireChannelRegistered();
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelUnregistered()", (Object)this.logPrefix());
        }
        if (this.channel != null && ctx.channel() != this.channel) {
            logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners", (Object)this.logPrefix());
            ctx.fireChannelUnregistered();
            return;
        }
        this.channel = null;
        this.buffer.release();
        this.endpoint.unregisterQueue(this);
        this.reset();
        this.setState(LifecycleState.CLOSED);
        this.rsm.close();
        ctx.fireChannelUnregistered();
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof ConnectionEvents.Reset) {
            this.reset();
        }
        super.userEventTriggered(ctx, evt);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        InternalLogLevel logLevel = InternalLogLevel.WARN;
        if (!this.queue.isEmpty()) {
            RedisCommand<?, ?, ?> command = this.queue.poll();
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in {}", (Object)this.logPrefix(), command);
            }
            logLevel = InternalLogLevel.DEBUG;
            command.completeExceptionally(cause);
        }
        if (this.channel == null || !this.channel.isActive() || !this.isConnected()) {
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in connectionError", (Object)this.logPrefix());
            }
            logLevel = InternalLogLevel.DEBUG;
            this.endpoint.notifyException(cause);
        }
        if (cause instanceof IOException && logLevel.ordinal() > InternalLogLevel.INFO.ordinal()) {
            logLevel = InternalLogLevel.INFO;
            if (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                logLevel = InternalLogLevel.DEBUG;
            }
        }
        logger.log(logLevel, "{} Unexpected exception during request: {}", new Object[]{this.logPrefix, cause.toString(), cause});
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.logPrefix = null;
        if (this.debugEnabled) {
            logger.debug("{} channelActive()", (Object)this.logPrefix());
        }
        this.setState(LifecycleState.CONNECTED);
        this.endpoint.notifyChannelActive(ctx.channel());
        super.channelActive(ctx);
        if (this.channel != null) {
            this.channel.eventLoop().submit(new Runnable(){

                @Override
                public void run() {
                    CommandHandler.this.channel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Activated());
                }
            });
        }
        if (this.debugEnabled) {
            logger.debug("{} channelActive() done", (Object)this.logPrefix());
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelInactive()", (Object)this.logPrefix());
        }
        if (this.channel != null && ctx.channel() != this.channel) {
            logger.debug("{} My channel and ctx.channel mismatch. Propagating event to other listeners.", (Object)this.logPrefix());
            super.channelInactive(ctx);
            return;
        }
        this.setState(LifecycleState.DISCONNECTED);
        this.setState(LifecycleState.DEACTIVATING);
        this.endpoint.notifyChannelInactive(ctx.channel());
        this.endpoint.notifyDrainQueuedCommands(this);
        this.setState(LifecycleState.DEACTIVATED);
        this.rsm.reset();
        if (this.debugEnabled) {
            logger.debug("{} channelInactive() done", (Object)this.logPrefix());
        }
        super.channelInactive(ctx);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} write(ctx, {}, promise)", (Object)this.logPrefix(), msg);
        }
        if (msg instanceof RedisCommand) {
            this.writeSingleCommand(ctx, (RedisCommand)msg, promise);
            return;
        }
        if (msg instanceof Collection) {
            this.writeBatch(ctx, (Collection)msg, promise);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        if (!input.isReadable() || input.refCnt() == 0) {
            logger.warn("{} Input not readable {}, {}", new Object[]{this.logPrefix(), input.isReadable(), input.refCnt()});
            return;
        }
        if (this.debugEnabled) {
            logger.debug("{} Received: {} bytes, {} queued commands", new Object[]{this.logPrefix(), input.readableBytes(), this.queue.size()});
        }
        try {
            if (this.buffer.refCnt() < 1) {
                logger.warn("{} Ignoring received data for closed or abandoned connection", (Object)this.logPrefix());
                return;
            }
            if (this.debugEnabled && ctx.channel() != this.channel) {
                logger.debug("{} Ignoring data for a non-registered channel {}", (Object)this.logPrefix(), (Object)ctx.channel());
                return;
            }
            if (this.traceEnabled) {
                logger.trace("{} Buffer: {}", (Object)this.logPrefix(), (Object)input.toString(Charset.defaultCharset()).trim());
            }
            this.buffer.writeBytes(input);
            this.decode(ctx, this.buffer);
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        while (!this.queue.isEmpty()) {
            RedisCommand<?, ?, ?> command = this.queue.peek();
            if (this.debugEnabled) {
                logger.debug("{} Queue contains: {} commands", (Object)this.logPrefix(), (Object)this.queue.size());
            }
            WithLatency withLatency = this.getWithLatency(command);
            if (!this.rsm.decode(buffer, command, command.getOutput())) {
                return;
            }
            this.recordLatency(withLatency, command.getType());
            this.queue.poll();
            try {
                command.complete();
            }
            catch (Exception e) {
                logger.warn("{} Unexpected exception during request: {}", new Object[]{this.logPrefix, e.toString(), e});
            }
            if (buffer.refCnt() == 0) continue;
            buffer.discardReadBytes();
        }
    }

    private WithLatency getWithLatency(RedisCommand<?, ?, ?> command) {
        RedisCommand<?, ?, ?> unwrappedCommand;
        WithLatency withLatency = null;
        if (this.clientResources.commandLatencyCollector().isEnabled() && (unwrappedCommand = CommandWrapper.unwrap(command)) instanceof WithLatency && (withLatency = (WithLatency)((Object)unwrappedCommand)).getFirstResponse() == -1L) {
            withLatency.firstResponse(this.nanoTime());
        }
        return withLatency;
    }

    protected boolean decode(ByteBuf buffer, CommandOutput<?, ?, ?> output) {
        return this.rsm.decode(buffer, output);
    }

    private void recordLatency(WithLatency withLatency, ProtocolKeyword commandType) {
        if (withLatency != null && this.clientResources.commandLatencyCollector().isEnabled() && this.channel != null && this.remote() != null) {
            long firstResponseLatency = this.nanoTime() - withLatency.getFirstResponse();
            long completionLatency = this.nanoTime() - withLatency.getSent();
            this.clientResources.commandLatencyCollector().recordCommandLatency(this.local(), this.remote(), commandType, firstResponseLatency, completionLatency);
        }
    }

    private SocketAddress remote() {
        return this.channel.remoteAddress();
    }

    private SocketAddress local() {
        if (this.channel.localAddress() != null) {
            return this.channel.localAddress();
        }
        return LocalAddress.ANY;
    }

    boolean isConnected() {
        return this.lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal() && this.lifecycleState.ordinal() < LifecycleState.DISCONNECTED.ordinal();
    }

    private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise) throws Exception {
        if (command.isCancelled()) {
            return;
        }
        this.queueCommand(command, promise);
        ctx.write(command, promise);
    }

    private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?, ?>> batch, ChannelPromise promise) throws Exception {
        Collection<RedisCommand<?, ?, ?>> toWrite = batch;
        boolean cancelledCommands = false;
        for (RedisCommand<?, ?, ?> command : batch) {
            if (!command.isCancelled()) continue;
            cancelledCommands = true;
            break;
        }
        if (cancelledCommands) {
            toWrite = new ArrayList(batch.size());
            for (RedisCommand<?, ?, ?> command : batch) {
                if (command.isCancelled()) continue;
                toWrite.add(command);
                this.queueCommand(command, promise);
            }
        } else {
            for (RedisCommand<?, ?, ?> command : toWrite) {
                this.queueCommand(command, promise);
            }
        }
        if (!toWrite.isEmpty()) {
            ctx.write(toWrite, promise);
        }
    }

    private void queueCommand(RedisCommand<?, ?, ?> command, ChannelPromise promise) throws Exception {
        try {
            if (command.getOutput() == null) {
                command.complete();
            } else {
                RedisCommand<?, ?, ?> unwrappedCommand;
                this.queue.add(command);
                if (this.clientResources.commandLatencyCollector().isEnabled() && (unwrappedCommand = CommandWrapper.unwrap(command)) instanceof WithLatency) {
                    WithLatency withLatency = (WithLatency)((Object)unwrappedCommand);
                    withLatency.firstResponse(-1L);
                    withLatency.sent(this.nanoTime());
                }
            }
        }
        catch (Exception e) {
            command.completeExceptionally(e);
            promise.setFailure((Throwable)e);
            throw e;
        }
    }

    private long nanoTime() {
        return System.nanoTime();
    }

    protected void setState(LifecycleState lifecycleState) {
        if (this.lifecycleState != LifecycleState.CLOSED) {
            this.lifecycleState = lifecycleState;
        }
    }

    protected LifecycleState getState() {
        return this.lifecycleState;
    }

    public boolean isClosed() {
        return this.lifecycleState == LifecycleState.CLOSED;
    }

    private void reset() {
        RedisCommand<?, ?, ?> cmd;
        this.rsm.reset();
        while ((cmd = this.queue.poll()) != null) {
            if (cmd.getOutput() != null) {
                cmd.getOutput().setError("Reset");
            }
            cmd.cancel();
        }
        if (this.buffer.refCnt() > 0) {
            this.buffer.clear();
        }
    }

    protected String logPrefix() {
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        StringBuilder buffer = new StringBuilder(64);
        buffer.append('[').append("chid=0x").append(Long.toHexString(this.commandHandlerId)).append(", ").append(ChannelLogDescriptor.logDescriptor(this.channel)).append(']');
        this.logPrefix = buffer.toString();
        return this.logPrefix;
    }

    @Override
    public Queue<RedisCommand<?, ?, ?>> getQueue() {
        return this.queue;
    }

    public static enum LifecycleState {
        NOT_CONNECTED,
        REGISTERED,
        CONNECTED,
        ACTIVATING,
        ACTIVE,
        DISCONNECTED,
        DEACTIVATING,
        DEACTIVATED,
        CLOSED;

    }
}

