/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.remoting.impl;

import com.taobao.remoting.Connection;
import com.taobao.remoting.IOEventListener;
import com.taobao.remoting.RequestProcessor;
import com.taobao.remoting.impl.ConnectionRequest;
import com.taobao.remoting.impl.ConnectionResponse;
import com.taobao.remoting.impl.DefaultClient;
import com.taobao.remoting.impl.DefaultConnection;
import com.taobao.remoting.impl.ProcessorUtil;
import com.taobao.remoting.locale.LogResources;
import com.taobao.remoting.util.DIYExecutor;
import com.taobao.remoting.util.LoggerInit;
import com.taobao.remoting.util.UnsafeCast;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

class DefaultMsgListener
implements IOEventListener.ConnectionMsgReceivedListener {
    private ProcessorUtil processors = new ProcessorUtil();

    DefaultMsgListener() {
    }

    @Override
    public void messageReceived(Connection conn, Object message) throws Exception {
        DefaultConnection connection = (DefaultConnection)UnsafeCast.cast(conn);
        if (message instanceof ConnectionRequest) {
            this.doRequest(connection, (ConnectionRequest)message);
        } else {
            DefaultClient client = (DefaultClient)connection.getClient();
            client.putResponse((ConnectionResponse)message);
        }
    }

    private void doRequest(final DefaultConnection connection, ConnectionRequest request) {
        final long requestId = request.getId();
        final byte direction = request.getDirection();
        final byte serializeProtocol = request.getSerializeProtocol();
        final Object appRequest = request.getAppRequest();
        final Thread ioThread = Thread.currentThread();
        connection.setLastRequestProtocol(serializeProtocol);
        final ConnectionResponse response = new ConnectionResponse();
        response.setRequestId(requestId);
        response.setDirection(direction);
        response.setSerializeProtocol(serializeProtocol);
        response.setHost(connection.getLocalAddress());
        final AppResponseOutputImpl output = new AppResponseOutputImpl(connection, response);
        Class<?> appReqType = appRequest.getClass();
        final RequestProcessor<?> processor = this.findProcessor(connection, appRequest);
        if (null == processor) {
            String msg = LogResources.getLog("requestProcessorNotFound", appReqType.getName());
            response.setResult(1);
            response.setErrorMsg(msg);
            connection.write(response, null);
            return;
        }
        final Executor requestExecutor = processor instanceof RequestProcessor.MultiExecutorRequestProcessor ? ((RequestProcessor.MultiExecutorRequestProcessor)processor).getExecutor(appRequest) : processor.getExecutor();
        Runnable bizJob = new Runnable(){

            @Override
            public void run() {
                if (ioThread == Thread.currentThread() && requestExecutor != DIYExecutor.getInstance()) {
                    String msg = LogResources.getLog("ioThreadCannotDoRequest", requestId, appRequest);
                    response.setErrorMsg(msg);
                    connection.write(response, null);
                    LoggerInit.LOGGER.warn(msg);
                    return;
                }
                if (processor instanceof RequestProcessor.RequestPreProcessor) {
                    ((RequestProcessor.RequestPreProcessor)((Object)processor)).beforeHandleRequest(requestId, serializeProtocol, direction);
                }
                processor.handleRequest(appRequest, output);
            }
        };
        try {
            requestExecutor.execute(bizJob);
        }
        catch (RejectedExecutionException e) {
            processor.onRejectedExecutionException(appRequest, output);
        }
    }

    private RequestProcessor<?> findProcessor(Connection conn, Object appRequest) {
        RequestProcessor<?> processor = null;
        processor = this.processors.findProcessor(appRequest.getClass());
        if (null != processor) {
            return processor;
        }
        return conn.getConnectionFactory().findProcessor(appRequest.getClass());
    }

    @Override
    public void registerProcessor(RequestProcessor<?> processor) {
        this.processors.registerProcessor(processor);
    }

    @Override
    public <T> RequestProcessor<T> removeProcessor(Class<T> appRequestClazz) {
        return this.processors.removeProcessor(appRequestClazz);
    }

    @Override
    public Map<Class<?>, RequestProcessor<?>> getProcessors() {
        return this.processors.getProcessors();
    }

    @Override
    public void updateProcessors(Map<Class<?>, RequestProcessor<?>> newProcessors) {
        this.processors.updateProcessors(newProcessors);
    }

    private static class AppResponseOutputImpl
    implements RequestProcessor.AppResponseOutput {
        private Connection connection;
        private ConnectionResponse connResp;
        private boolean isResponseSet = false;

        AppResponseOutputImpl(Connection _conn, ConnectionResponse _connResp) {
            this.connection = _conn;
            this.connResp = _connResp;
        }

        @Override
        public long requestId() {
            return this.connResp.getRequestId();
        }

        @Override
        public void write(Object appResp) {
            if (this.isResponseSet) {
                throw new IllegalStateException("########## \u4e0d\u80fd\u91cd\u590d\u5411AppResponseOutput\u5199\u5165\u54cd\u5e94.");
            }
            this.isResponseSet = true;
            if (1 == this.connResp.getDirection()) {
                return;
            }
            this.connResp.setAppResponse(appResp);
            this.connection.write(this.connResp, null);
        }

        @Override
        public String remoteHost() {
            return this.connection.getRemoteAddress();
        }

        @Override
        public Connection getConnection() {
            return this.connection;
        }
    }
}

