/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.jraft.rpc.impl;

import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.InvokeTimeoutException;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.ClientService;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.ProtobufMsgFactory;
import com.alipay.sofa.jraft.rpc.RaftRpcFactory;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcUtils;
import com.alipay.sofa.jraft.rpc.impl.FutureImpl;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import com.alipay.sofa.jraft.util.ThreadPoolMetricSet;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.Metric;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractClientService
implements ClientService {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractClientService.class);
    protected volatile RpcClient rpcClient;
    protected ThreadPoolExecutor rpcExecutor;
    protected RpcOptions rpcOptions;

    public RpcClient getRpcClient() {
        return this.rpcClient;
    }

    @Override
    public boolean isConnected(Endpoint endpoint) {
        RpcClient rc = this.rpcClient;
        return rc != null && AbstractClientService.isConnected(rc, endpoint);
    }

    private static boolean isConnected(RpcClient rpcClient, Endpoint endpoint) {
        return rpcClient.checkConnection(endpoint);
    }

    @Override
    public boolean checkConnection(Endpoint endpoint, boolean createIfAbsent) {
        RpcClient rc = this.rpcClient;
        if (rc == null) {
            throw new IllegalStateException("Client service is uninitialized.");
        }
        return rc.checkConnection(endpoint, createIfAbsent);
    }

    @Override
    public synchronized boolean init(RpcOptions rpcOptions) {
        if (this.rpcClient != null) {
            return true;
        }
        this.rpcOptions = rpcOptions;
        return this.initRpcClient(this.rpcOptions.getRpcProcessorThreadPoolSize());
    }

    protected void configRpcClient(RpcClient rpcClient) {
    }

    protected boolean initRpcClient(int rpcProcessorThreadPoolSize) {
        RaftRpcFactory factory = RpcFactoryHelper.rpcFactory();
        this.rpcClient = factory.createRpcClient(factory.defaultJRaftClientConfigHelper(this.rpcOptions));
        this.configRpcClient(this.rpcClient);
        this.rpcClient.init(null);
        this.rpcExecutor = ThreadPoolUtil.newBuilder().poolName("JRaft-RPC-Processor").enableMetric(true).coreThreads(rpcProcessorThreadPoolSize / 3).maximumThreads(rpcProcessorThreadPoolSize).keepAliveSeconds(60L).workQueue(new ArrayBlockingQueue<Runnable>(10000)).threadFactory(new NamedThreadFactory("JRaft-RPC-Processor-", true)).build();
        if (this.rpcOptions.getMetricRegistry() != null) {
            this.rpcOptions.getMetricRegistry().register("raft-rpc-client-thread-pool", (Metric)new ThreadPoolMetricSet(this.rpcExecutor));
            Utils.registerClosureExecutorMetrics(this.rpcOptions.getMetricRegistry());
        }
        return true;
    }

    @Override
    public synchronized void shutdown() {
        if (this.rpcClient != null) {
            this.rpcClient.shutdown();
            this.rpcClient = null;
            this.rpcExecutor.shutdown();
        }
    }

    @Override
    public boolean connect(Endpoint endpoint) {
        RpcClient rc = this.rpcClient;
        if (rc == null) {
            throw new IllegalStateException("Client service is uninitialized.");
        }
        if (AbstractClientService.isConnected(rc, endpoint)) {
            return true;
        }
        try {
            RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder().setSendTimestamp(System.currentTimeMillis()).build();
            RpcRequests.ErrorResponse resp = (RpcRequests.ErrorResponse)rc.invokeSync(endpoint, req, this.rpcOptions.getRpcConnectTimeoutMs());
            return resp.getErrorCode() == 0;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        catch (RemotingException e) {
            LOG.error("Fail to connect {}, remoting exception: {}.", (Object)endpoint, (Object)e.getMessage());
            return false;
        }
    }

    @Override
    public boolean disconnect(Endpoint endpoint) {
        RpcClient rc = this.rpcClient;
        if (rc == null) {
            return true;
        }
        LOG.info("Disconnect from {}.", (Object)endpoint);
        rc.closeConnection(endpoint);
        return true;
    }

    @Override
    public <T extends Message> Future<Message> invokeWithDone(Endpoint endpoint, Message request, RpcResponseClosure<T> done, int timeoutMs) {
        return this.invokeWithDone(endpoint, request, done, timeoutMs, this.rpcExecutor);
    }

    public <T extends Message> Future<Message> invokeWithDone(Endpoint endpoint, Message request, RpcResponseClosure<T> done, int timeoutMs, Executor rpcExecutor) {
        return this.invokeWithDone(endpoint, request, null, done, timeoutMs, rpcExecutor);
    }

    public <T extends Message> Future<Message> invokeWithDone(Endpoint endpoint, Message request, InvokeContext ctx, RpcResponseClosure<T> done, int timeoutMs) {
        return this.invokeWithDone(endpoint, request, ctx, done, timeoutMs, this.rpcExecutor);
    }

    public <T extends Message> Future<Message> invokeWithDone(Endpoint endpoint, final Message request, InvokeContext ctx, final RpcResponseClosure<T> done, int timeoutMs, Executor rpcExecutor) {
        RpcClient rc = this.rpcClient;
        final FutureImpl<Message> future = new FutureImpl<Message>();
        final Executor currExecutor = rpcExecutor != null ? rpcExecutor : this.rpcExecutor;
        try {
            if (rc == null) {
                future.failure(new IllegalStateException("Client service is uninitialized."));
                RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL, "Client service is uninitialized.", new Object[0]));
                return future;
            }
            rc.invokeAsync(endpoint, request, ctx, new InvokeCallback(){

                @Override
                public void complete(Object result, Throwable err) {
                    if (future.isCancelled()) {
                        AbstractClientService.this.onCanceled(request, done);
                        return;
                    }
                    if (err == null) {
                        Object msg;
                        Status status = Status.OK();
                        if (result instanceof RpcRequests.ErrorResponse) {
                            status = AbstractClientService.handleErrorResponse((RpcRequests.ErrorResponse)result);
                            msg = (Message)result;
                        } else if (result instanceof Message) {
                            Descriptors.FieldDescriptor fd = ((Message)result).getDescriptorForType().findFieldByNumber(99);
                            if (fd != null && ((Message)result).hasField(fd)) {
                                RpcRequests.ErrorResponse eResp = (RpcRequests.ErrorResponse)((Message)result).getField(fd);
                                status = AbstractClientService.handleErrorResponse(eResp);
                                msg = eResp;
                            } else {
                                msg = (Message)result;
                            }
                        } else {
                            msg = (Message)result;
                        }
                        if (done != null) {
                            try {
                                if (status.isOk()) {
                                    done.setResponse(msg);
                                }
                                done.run(status);
                            }
                            catch (Throwable t) {
                                LOG.error("Fail to run RpcResponseClosure, the request is {}.", (Object)request, (Object)t);
                            }
                        }
                        if (!future.isDone()) {
                            future.setResult(msg);
                        }
                    } else {
                        if (done != null) {
                            try {
                                done.run(new Status(err instanceof InvokeTimeoutException ? RaftError.ETIMEDOUT : RaftError.EINTERNAL, "RPC exception:" + err.getMessage(), new Object[0]));
                            }
                            catch (Throwable t) {
                                LOG.error("Fail to run RpcResponseClosure, the request is {}.", (Object)request, (Object)t);
                            }
                        }
                        if (!future.isDone()) {
                            future.failure(err);
                        }
                    }
                }

                @Override
                public Executor executor() {
                    return currExecutor;
                }
            }, timeoutMs <= 0 ? (long)this.rpcOptions.getRpcDefaultTimeout() : (long)timeoutMs);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            future.failure(e);
            RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTR, "Sending rpc was interrupted", new Object[0]));
        }
        catch (RemotingException e) {
            future.failure(e);
            RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL, "Fail to send a RPC request:" + e.getMessage(), new Object[0]));
        }
        return future;
    }

    private static Status handleErrorResponse(RpcRequests.ErrorResponse eResp) {
        Status status = new Status();
        status.setCode(eResp.getErrorCode());
        if (eResp.hasErrorMsg()) {
            status.setErrorMsg(eResp.getErrorMsg());
        }
        return status;
    }

    private <T extends Message> void onCanceled(Message request, RpcResponseClosure<T> done) {
        if (done != null) {
            try {
                done.run(new Status(RaftError.ECANCELED, "RPC request was canceled by future.", new Object[0]));
            }
            catch (Throwable t) {
                LOG.error("Fail to run RpcResponseClosure, the request is {}.", (Object)request, (Object)t);
            }
        }
    }

    static {
        ProtobufMsgFactory.load();
    }
}

