/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc;

import io.netty.channel.ChannelHandlerContext;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterRMResponse;
import io.seata.core.protocol.RegisterTMRequest;
import io.seata.core.protocol.RegisterTMResponse;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.protocol.Version;
import io.seata.core.rpc.ChannelManager;
import io.seata.core.rpc.RpcContext;
import io.seata.core.rpc.ServerMessageListener;
import io.seata.core.rpc.ServerMessageSender;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.RegisterCheckAuthHandler;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultServerMessageListenerImpl
implements ServerMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServerMessageListenerImpl.class);
    private static BlockingQueue<String> logQueue = new LinkedBlockingQueue<String>();
    private ServerMessageSender serverMessageSender;
    private final TransactionMessageHandler transactionMessageHandler;
    private static final int MAX_LOG_SEND_THREAD = 1;
    private static final int MAX_LOG_TAKE_SIZE = 1024;
    private static final long KEEP_ALIVE_TIME = 0L;
    private static final String THREAD_PREFIX = "batchLoggerPrint";
    private static final long BUSY_SLEEP_MILLS = 5L;

    public DefaultServerMessageListenerImpl(TransactionMessageHandler transactionMessageHandler) {
        this.transactionMessageHandler = transactionMessageHandler;
    }

    @Override
    public void onTrxMessage(RpcMessage request, ChannelHandlerContext ctx) {
        Object message = request.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("server received:{},clientIp:{},vgroup:{}", new Object[]{message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup()});
        } else {
            try {
                logQueue.put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
            }
            catch (InterruptedException e) {
                LOGGER.error("put message to logQueue error: {}", (Object)e.getMessage(), (Object)e);
            }
        }
        if (!(message instanceof AbstractMessage)) {
            return;
        }
        if (message instanceof MergedWarpMessage) {
            AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage)message).msgs.size()];
            for (int i = 0; i < results.length; ++i) {
                AbstractMessage subMessage = ((MergedWarpMessage)message).msgs.get(i);
                results[i] = this.transactionMessageHandler.onRequest(subMessage, rpcContext);
            }
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results);
            this.getServerMessageSender().sendResponse(request, ctx.channel(), resultMessage);
        } else if (message instanceof AbstractResultMessage) {
            this.transactionMessageHandler.onResponse((AbstractResultMessage)message, rpcContext);
        } else {
            AbstractMessage msg = (AbstractMessage)message;
            AbstractResultMessage result = this.transactionMessageHandler.onRequest(msg, rpcContext);
            this.getServerMessageSender().sendResponse(request, ctx.channel(), result);
        }
    }

    @Override
    public void onRegRmMessage(RpcMessage request, ChannelHandlerContext ctx, RegisterCheckAuthHandler checkAuthHandler) {
        RegisterRMRequest message = (RegisterRMRequest)request.getBody();
        String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        boolean isSuccess = false;
        try {
            if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
                ChannelManager.registerRMChannel(message, ctx.channel());
                Version.putChannelVersion(ctx.channel(), message.getVersion());
                isSuccess = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{}", new Object[]{ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()});
                }
            }
        }
        catch (Exception exx) {
            isSuccess = false;
            LOGGER.error(exx.getMessage());
        }
        this.getServerMessageSender().sendResponse(request, ctx.channel(), new RegisterRMResponse(isSuccess));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("RM register success,message:{},channel:{}", (Object)message, (Object)ctx.channel());
        }
    }

    @Override
    public void onRegTmMessage(RpcMessage request, ChannelHandlerContext ctx, RegisterCheckAuthHandler checkAuthHandler) {
        RegisterTMRequest message = (RegisterTMRequest)request.getBody();
        String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        Version.putChannelVersion(ctx.channel(), message.getVersion());
        boolean isSuccess = false;
        try {
            if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) {
                ChannelManager.registerTMChannel(message, ctx.channel());
                Version.putChannelVersion(ctx.channel(), message.getVersion());
                isSuccess = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{}", new Object[]{ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId()});
                }
            }
        }
        catch (Exception exx) {
            isSuccess = false;
            LOGGER.error(exx.getMessage());
        }
        this.getServerMessageSender().sendResponse(request, ctx.channel(), new RegisterTMResponse(isSuccess));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("TM register success,message:{},channel:{}", (Object)message, (Object)ctx.channel());
        }
    }

    @Override
    public void onCheckMessage(RpcMessage request, ChannelHandlerContext ctx) {
        try {
            this.getServerMessageSender().sendResponse(request, ctx.channel(), HeartbeatMessage.PONG);
        }
        catch (Throwable throwable) {
            LOGGER.error("send response error: {}", (Object)throwable.getMessage(), (Object)throwable);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("received PING from {}", (Object)ctx.channel().remoteAddress());
        }
    }

    public void init() {
        ThreadPoolExecutor mergeSendExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(THREAD_PREFIX, 1, true));
        mergeSendExecutorService.submit(new BatchLogRunnable());
    }

    public ServerMessageSender getServerMessageSender() {
        if (this.serverMessageSender == null) {
            throw new IllegalArgumentException("serverMessageSender must not be null");
        }
        return this.serverMessageSender;
    }

    public void setServerMessageSender(ServerMessageSender serverMessageSender) {
        this.serverMessageSender = serverMessageSender;
    }

    static class BatchLogRunnable
    implements Runnable {
        BatchLogRunnable() {
        }

        @Override
        public void run() {
            ArrayList logList = new ArrayList();
            while (true) {
                try {
                    while (true) {
                        logList.add(logQueue.take());
                        logQueue.drainTo(logList, 1024);
                        if (LOGGER.isInfoEnabled()) {
                            for (String str : logList) {
                                LOGGER.info(str);
                            }
                        }
                        logList.clear();
                        TimeUnit.MILLISECONDS.sleep(5L);
                    }
                }
                catch (InterruptedException exx) {
                    LOGGER.error("batch log busy sleep error:{}", (Object)exx.getMessage(), (Object)exx);
                    continue;
                }
                break;
            }
        }
    }
}

