/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.processor.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.AbstractResultMessage;
import io.seata.core.protocol.BatchResultMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.protocol.Version;
import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.RemotingServer;
import io.seata.core.rpc.RpcContext;
import io.seata.core.rpc.TransactionMessageHandler;
import io.seata.core.rpc.netty.ChannelManager;
import io.seata.core.rpc.netty.NettyServerConfig;
import io.seata.core.rpc.processor.RemotingProcessor;
import io.seata.core.rpc.processor.server.BatchLogHandler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerOnRequestProcessor
implements RemotingProcessor,
Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerOnRequestProcessor.class);
    private final RemotingServer remotingServer;
    private final TransactionMessageHandler transactionMessageHandler;
    private ExecutorService batchResponseExecutorService;
    private final ConcurrentMap<Channel, BlockingQueue<QueueItem>> basketMap = new ConcurrentHashMap<Channel, BlockingQueue<QueueItem>>();
    protected final Object batchResponseLock = new Object();
    private volatile boolean isResponding = false;
    private static final int MAX_BATCH_RESPONSE_MILLS = 1;
    private static final int MAX_BATCH_RESPONSE_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    private static final String BATCH_RESPONSE_THREAD_PREFIX = "rpcBatchResponse";
    private static final boolean PARALLEL_REQUEST_HANDLE = ConfigurationFactory.getInstance().getBoolean("server.enableParallelRequestHandle", true);

    public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {
        this.remotingServer = remotingServer;
        this.transactionMessageHandler = transactionMessageHandler;
        if (NettyServerConfig.isEnableTcServerBatchSendResponse()) {
            this.batchResponseExecutorService = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(BATCH_RESPONSE_THREAD_PREFIX, 1));
            this.batchResponseExecutorService.submit(new BatchResponseRunnable());
        }
    }

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
            this.onRequestMessage(ctx, rpcMessage);
        } else {
            try {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
                }
                ctx.disconnect();
                ctx.close();
            }
            catch (Exception exx) {
                LOGGER.error(exx.getMessage());
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
            }
        }
    }

    @Override
    public void destroy() {
        if (this.batchResponseExecutorService != null) {
            this.batchResponseExecutorService.shutdown();
        }
    }

    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
        Object message = rpcMessage.getBody();
        RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
        if (!(message instanceof AbstractMessage)) {
            LOGGER.error("unrecognized message:{}", message);
            return;
        }
        if (message instanceof MergedWarpMessage) {
            if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion()) && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
                List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
                List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
                for (int i = 0; i < msgs.size(); ++i) {
                    AbstractMessage abstractMessage = msgs.get(i);
                    int msgId = msgIds.get(i);
                    if (PARALLEL_REQUEST_HANDLE) {
                        CompletableFuture.runAsync(() -> this.handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                        continue;
                    }
                    this.handleRequestsByMergedWarpMessageBy150(abstractMessage, msgId, rpcMessage, ctx, rpcContext);
                }
            } else {
                ArrayList<AbstractResultMessage> results = new ArrayList<AbstractResultMessage>();
                ArrayList<CompletableFuture<AbstractResultMessage>> completableFutures = null;
                for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); ++i) {
                    if (PARALLEL_REQUEST_HANDLE) {
                        if (completableFutures == null) {
                            completableFutures = new ArrayList<CompletableFuture<AbstractResultMessage>>();
                        }
                        int n = i;
                        completableFutures.add(CompletableFuture.supplyAsync(() -> this.handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
                        continue;
                    }
                    results.add(i, this.handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                }
                if (CollectionUtils.isNotEmpty(completableFutures)) {
                    try {
                        for (CompletableFuture completableFuture : completableFutures) {
                            results.add((AbstractResultMessage)completableFuture.get());
                        }
                    }
                    catch (InterruptedException | ExecutionException e) {
                        LOGGER.error("handle request error: {}", (Object)e.getMessage(), (Object)e);
                    }
                }
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
                this.remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            }
        } else {
            AbstractMessage msg = (AbstractMessage)message;
            if (LOGGER.isInfoEnabled()) {
                String receiveMsgLog = String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
                BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
            }
            AbstractResultMessage result = this.transactionMessageHandler.onRequest(msg, rpcContext);
            this.remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
            if (LOGGER.isInfoEnabled()) {
                String resultMsgLog = String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", result, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
                BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyBatchRespondingThread() {
        if (!this.isResponding) {
            Object object = this.batchResponseLock;
            synchronized (object) {
                this.batchResponseLock.notifyAll();
            }
        }
    }

    private BlockingQueue<QueueItem> computeIfAbsentMsgQueue(Channel channel) {
        return CollectionUtils.computeIfAbsent(this.basketMap, channel, key -> new LinkedBlockingQueue());
    }

    private void offerMsg(BlockingQueue<QueueItem> msgQueue, RpcMessage rpcMessage, AbstractResultMessage resultMessage, int msgId, Channel channel) {
        if (!msgQueue.offer(new QueueItem(resultMessage, msgId, rpcMessage))) {
            LOGGER.error("put message into basketMap offer failed, channel:{},rpcMessage:{},resultMessage:{}", new Object[]{channel, rpcMessage, resultMessage});
        }
    }

    private AbstractResultMessage handleRequestsByMergedWarpMessage(AbstractMessage subMessage, RpcContext rpcContext) {
        if (LOGGER.isInfoEnabled()) {
            String receiveMsgLog = String.format("receive msg[merged]: %s, clientIp: %s, vgroup: %s", subMessage, NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
        }
        AbstractResultMessage resultMessage = this.transactionMessageHandler.onRequest(subMessage, rpcContext);
        if (LOGGER.isInfoEnabled()) {
            String resultMsgLog = String.format("result msg[merged]: %s, clientIp: %s, vgroup: %s", resultMessage, NetUtil.toIpAddress(rpcContext.getChannel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
        }
        return resultMessage;
    }

    private void handleRequestsByMergedWarpMessageBy150(AbstractMessage msg, int msgId, RpcMessage rpcMessage, ChannelHandlerContext ctx, RpcContext rpcContext) {
        if (LOGGER.isInfoEnabled()) {
            String receiveMsgLog = String.format("receive msg[merged]: %s, clientIp: %s, vgroup: %s", msg, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
        }
        AbstractResultMessage resultMessage = this.transactionMessageHandler.onRequest(msg, rpcContext);
        BlockingQueue<QueueItem> msgQueue = this.computeIfAbsentMsgQueue(ctx.channel());
        this.offerMsg(msgQueue, rpcMessage, resultMessage, msgId, ctx.channel());
        this.notifyBatchRespondingThread();
        if (LOGGER.isInfoEnabled()) {
            String resultMsgLog = String.format("result msg[merged]: %s, clientIp: %s, vgroup: %s", resultMessage, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
        }
    }

    private RpcMessage buildRpcMessage(ClientRequestRpcInfo clientRequestRpcInfo) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(clientRequestRpcInfo.getRpcMessageId());
        rpcMessage.setCodec(clientRequestRpcInfo.getCodec());
        rpcMessage.setCompressor(clientRequestRpcInfo.getCompressor());
        rpcMessage.setHeadMap(clientRequestRpcInfo.getHeadMap());
        return rpcMessage;
    }

    private static class QueueItem {
        private AbstractResultMessage resultMessage;
        private Integer msgId;
        private RpcMessage rpcMessage;

        public QueueItem(AbstractResultMessage resultMessage, int msgId, RpcMessage rpcMessage) {
            this.resultMessage = resultMessage;
            this.msgId = msgId;
            this.rpcMessage = rpcMessage;
        }

        public AbstractResultMessage getResultMessage() {
            return this.resultMessage;
        }

        public void setResultMessage(AbstractResultMessage resultMessage) {
            this.resultMessage = resultMessage;
        }

        public Integer getMsgId() {
            return this.msgId;
        }

        public void setMsgId(Integer msgId) {
            this.msgId = msgId;
        }

        public RpcMessage getRpcMessage() {
            return this.rpcMessage;
        }

        public void setRpcMessage(RpcMessage rpcMessage) {
            this.rpcMessage = rpcMessage;
        }
    }

    private static class ClientRequestRpcInfo {
        private int rpcMessageId;
        private byte codec;
        private byte compressor;
        private Map<String, String> headMap;

        public ClientRequestRpcInfo(RpcMessage rpcMessage) {
            this.rpcMessageId = rpcMessage.getId();
            this.codec = rpcMessage.getCodec();
            this.compressor = rpcMessage.getCompressor();
            this.headMap = rpcMessage.getHeadMap();
        }

        public int getRpcMessageId() {
            return this.rpcMessageId;
        }

        public void setRpcMessageId(int rpcMessageId) {
            this.rpcMessageId = rpcMessageId;
        }

        public byte getCodec() {
            return this.codec;
        }

        public void setCodec(byte codec) {
            this.codec = codec;
        }

        public byte getCompressor() {
            return this.compressor;
        }

        public void setCompressor(byte compressor) {
            this.compressor = compressor;
        }

        public Map<String, String> getHeadMap() {
            return this.headMap;
        }

        public void setHeadMap(Map<String, String> headMap) {
            this.headMap = headMap;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ClientRequestRpcInfo that = (ClientRequestRpcInfo)o;
            return this.rpcMessageId == that.rpcMessageId && this.codec == that.codec && this.compressor == that.compressor && this.headMap.equals(that.headMap);
        }

        public int hashCode() {
            return Objects.hash(this.rpcMessageId, this.codec, this.compressor, this.headMap);
        }
    }

    private class BatchResponseRunnable
    implements Runnable {
        private BatchResponseRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                Object object = ServerOnRequestProcessor.this.batchResponseLock;
                synchronized (object) {
                    try {
                        ServerOnRequestProcessor.this.batchResponseLock.wait(1L);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("BatchResponseRunnable Interrupted error", (Throwable)e);
                    }
                }
                ServerOnRequestProcessor.this.isResponding = true;
                ServerOnRequestProcessor.this.basketMap.forEach((channel, msgQueue) -> {
                    if (msgQueue.isEmpty()) {
                        return;
                    }
                    HashMap<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<ClientRequestRpcInfo, BatchResultMessage>();
                    while (!msgQueue.isEmpty()) {
                        QueueItem item = (QueueItem)msgQueue.poll();
                        BatchResultMessage batchResultMessage2 = CollectionUtils.computeIfAbsent(batchResultMessageMap, new ClientRequestRpcInfo(item.getRpcMessage()), key -> new BatchResultMessage());
                        batchResultMessage2.getResultMessages().add(item.getResultMessage());
                        batchResultMessage2.getMsgIds().add(item.getMsgId());
                    }
                    batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) -> ServerOnRequestProcessor.this.remotingServer.sendAsyncResponse(ServerOnRequestProcessor.this.buildRpcMessage(clientRequestRpcInfo), (Channel)channel, batchResultMessage));
                });
                ServerOnRequestProcessor.this.isResponding = false;
            }
        }
    }
}

