/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.client.config.impl;

import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.ClientConfigMetricRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
import com.alibaba.nacos.api.config.remote.response.ClientConfigMetricResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.address.ServerListChangeEvent;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
import com.alibaba.nacos.client.config.impl.CacheData;
import com.alibaba.nacos.client.config.impl.ConfigServerListManager;
import com.alibaba.nacos.client.config.impl.ConfigTransportClient;
import com.alibaba.nacos.client.config.impl.Limiter;
import com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor;
import com.alibaba.nacos.client.config.impl.LocalEncryptedDataKeyProcessor;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.env.SourceType;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.utils.AppNameUtils;
import com.alibaba.nacos.client.utils.EnvUtil;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.TenantUtil;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.labels.impl.DefaultLabelsCollectorManager;
import com.alibaba.nacos.common.lifecycle.Closeable;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfigFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.utils.ConnLabelsUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.plugin.auth.api.RequestResource;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

public class ClientWorker
implements Closeable {
    private static final Logger LOGGER = LogUtils.logger(ClientWorker.class);
    private static final String NOTIFY_HEADER = "notify";
    private static final String TAG_PARAM = "tag";
    private static final String APP_NAME_PARAM = "appName";
    private static final String BETAIPS_PARAM = "betaIps";
    private static final String TYPE_PARAM = "type";
    private static final String ENCRYPTED_DATA_KEY_PARAM = "encryptedDataKey";
    private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference(new HashMap());
    private final DefaultLabelsCollectorManager defaultLabelsCollectorManager = new DefaultLabelsCollectorManager();
    private Map<String, String> appLables = new HashMap<String, String>();
    private final ConfigFilterChainManager configFilterChainManager;
    private final String uuid = UUID.randomUUID().toString();
    private long timeout;
    private long requestTimeout;
    private final ConfigRpcTransportClient agent;
    private int taskPenaltyTime;
    private boolean enableRemoteSyncConfig = false;
    private static final int MIN_THREAD_NUM = 2;
    private static final int THREAD_MULTIPLE = 1;
    private final List<AtomicInteger> taskIdCacheCountList = new ArrayList<AtomicInteger>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        CacheData cache;
        group = this.blank2defaultGroup(group);
        CacheData cacheData = cache = this.addCacheDataIfAbsent(dataId, group);
        synchronized (cacheData) {
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setDiscard(false);
            cache.setConsistentWithServer(false);
            if (this.getCache(dataId, group) != cache) {
                this.putCache(GroupKey.getKey(dataId, group), cache);
            }
            this.agent.notifyListenConfig();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
        CacheData cache;
        group = this.blank2defaultGroup(group);
        String tenant = this.agent.getTenant();
        CacheData cacheData = cache = this.addCacheDataIfAbsent(dataId, group, tenant);
        synchronized (cacheData) {
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setDiscard(false);
            cache.setConsistentWithServer(false);
            if (this.getCache(dataId, group, tenant) != cache) {
                this.putCache(GroupKey.getKeyTenant(dataId, group, tenant), cache);
            }
            this.agent.notifyListenConfig();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTenantListenersWithContent(String dataId, String group, String content, String encryptedDataKey, List<? extends Listener> listeners) throws NacosException {
        CacheData cache;
        group = this.blank2defaultGroup(group);
        String tenant = this.agent.getTenant();
        CacheData cacheData = cache = this.addCacheDataIfAbsent(dataId, group, tenant);
        synchronized (cacheData) {
            cache.setEncryptedDataKey(encryptedDataKey);
            cache.setContent(content);
            for (Listener listener : listeners) {
                cache.addListener(listener);
            }
            cache.setDiscard(false);
            cache.setConsistentWithServer(false);
            if (this.getCache(dataId, group, tenant) != cache) {
                this.putCache(GroupKey.getKeyTenant(dataId, group, tenant), cache);
            }
            this.agent.notifyListenConfig();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeListener(String dataId, String group, Listener listener) {
        CacheData cache = this.getCache(dataId, group = this.blank2defaultGroup(group));
        if (null != cache) {
            CacheData cacheData = cache;
            synchronized (cacheData) {
                cache.removeListener(listener);
                if (cache.getListeners().isEmpty()) {
                    cache.setConsistentWithServer(false);
                    cache.setDiscard(true);
                    this.agent.removeCache(dataId, group);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTenantListener(String dataId, String group, Listener listener) {
        String tenant;
        CacheData cache = this.getCache(dataId, group = this.blank2defaultGroup(group), tenant = this.agent.getTenant());
        if (null != cache) {
            CacheData cacheData = cache;
            synchronized (cacheData) {
                cache.removeListener(listener);
                if (cache.getListeners().isEmpty()) {
                    cache.setConsistentWithServer(false);
                    cache.setDiscard(true);
                    this.agent.removeCache(dataId, group);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void removeCache(String dataId, String group, String tenant) {
        String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            CacheData remove = (CacheData)copy.remove(groupKey);
            if (remove != null) {
                this.decreaseTaskIdCount(remove.getTaskId());
            }
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [unsubscribe] {}", (Object)this.agent.getName(), (Object)groupKey);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
    }

    public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException {
        return this.agent.removeConfig(dataId, group, tenant, tag);
    }

    public boolean publishConfig(String dataId, String group, String tenant, String appName, String tag, String betaIps, String content, String encryptedDataKey, String casMd5, String type) throws NacosException {
        return this.agent.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, encryptedDataKey, casMd5, type);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheData addCacheDataIfAbsent(String dataId, String group) {
        CacheData cache = this.getCache(dataId, group);
        if (null != cache) {
            return cache;
        }
        String key = GroupKey.getKey(dataId, group);
        cache = new CacheData(this.configFilterChainManager, this.agent.getName(), dataId, group);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            CacheData cacheFromMap = this.getCache(dataId, group);
            if (null != cacheFromMap) {
                cache = cacheFromMap;
                cache.setInitializing(true);
            } else {
                int taskId = this.calculateTaskId();
                this.increaseTaskIdCount(taskId);
                cache.setTaskId(taskId);
            }
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.put(key, cache);
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [subscribe] {}", (Object)this.agent.getName(), (Object)key);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
        return cache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
        CacheData cache = this.getCache(dataId, group, tenant);
        if (null != cache) {
            return cache;
        }
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            CacheData cacheFromMap = this.getCache(dataId, group, tenant);
            if (null != cacheFromMap) {
                cache = cacheFromMap;
                cache.setInitializing(true);
            } else {
                cache = new CacheData(this.configFilterChainManager, this.agent.getName(), dataId, group, tenant);
                int taskId = this.calculateTaskId();
                this.increaseTaskIdCount(taskId);
                cache.setTaskId(taskId);
                if (this.enableRemoteSyncConfig) {
                    ConfigResponse response = this.getServerConfig(dataId, group, tenant, this.requestTimeout, false);
                    cache.setEncryptedDataKey(response.getEncryptedDataKey());
                    cache.setContent(response.getContent());
                }
            }
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.put(key, cache);
            this.cacheMap.set(copy);
        }
        LOGGER.info("[{}] [subscribe] {}", (Object)this.agent.getName(), (Object)key);
        MetricsMonitor.getListenConfigCountMonitor().set((double)this.cacheMap.get().size());
        return cache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putCache(String key, CacheData cache) {
        AtomicReference<Map<String, CacheData>> atomicReference = this.cacheMap;
        synchronized (atomicReference) {
            HashMap<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
            copy.put(key, cache);
            this.cacheMap.set(copy);
        }
    }

    private void increaseTaskIdCount(int taskId) {
        this.taskIdCacheCountList.get(taskId).incrementAndGet();
    }

    private void decreaseTaskIdCount(int taskId) {
        this.taskIdCacheCountList.get(taskId).decrementAndGet();
    }

    private int calculateTaskId() {
        int perTaskSize = (int)ParamUtil.getPerTaskConfigSize();
        for (int index = 0; index < this.taskIdCacheCountList.size(); ++index) {
            if (this.taskIdCacheCountList.get(index).get() >= perTaskSize) continue;
            return index;
        }
        this.taskIdCacheCountList.add(new AtomicInteger(0));
        return this.taskIdCacheCountList.size() - 1;
    }

    public CacheData getCache(String dataId, String group) {
        return this.getCache(dataId, group, TenantUtil.getUserTenantForAcm());
    }

    public CacheData getCache(String dataId, String group, String tenant) {
        if (null == dataId || null == group) {
            throw new IllegalArgumentException();
        }
        return this.cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
    }

    public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify) throws NacosException {
        if (StringUtils.isBlank((CharSequence)group)) {
            group = "DEFAULT_GROUP";
        }
        return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
    }

    private String blank2defaultGroup(String group) {
        return StringUtils.isBlank((CharSequence)group) ? "DEFAULT_GROUP" : group.trim();
    }

    public ClientWorker(ConfigFilterChainManager configFilterChainManager, ConfigServerListManager serverListManager, NacosClientProperties properties) throws NacosException {
        this.configFilterChainManager = configFilterChainManager;
        this.init(properties);
        this.agent = new ConfigRpcTransportClient(properties, serverListManager);
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(this.initWorkerThreadCount(properties), (ThreadFactory)new NameThreadFactory("com.alibaba.nacos.client.Worker"));
        this.agent.setExecutor(executorService);
        this.agent.start();
    }

    void initAppLabels(Properties properties) {
        this.appLables = ConnLabelsUtils.addPrefixForEachKey((Map)this.defaultLabelsCollectorManager.getLabels(properties), (String)"app_");
    }

    private int initWorkerThreadCount(NacosClientProperties properties) {
        int count = ThreadUtils.getSuitableThreadCount((int)1);
        if (properties == null) {
            return count;
        }
        count = Math.min(count, properties.getInteger("clientWorkerMaxThreadCount", count));
        count = Math.max(count, 2);
        return properties.getInteger("clientWorkerThreadCount", count);
    }

    private void init(NacosClientProperties properties) {
        this.requestTimeout = ConvertUtils.toLong((String)properties.getProperty("configRequestTimeout", "-1"));
        this.timeout = Math.max(ConvertUtils.toInt((String)properties.getProperty("configLongPollTimeout"), (int)30000), 10000);
        this.taskPenaltyTime = ConvertUtils.toInt((String)properties.getProperty("configRetryTime"), (int)2000);
        this.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty("enableRemoteSyncConfig"));
        this.initAppLabels(properties.getProperties(SourceType.PROPERTIES));
    }

    Map<String, Object> getMetrics(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
        HashMap<String, Object> metric = new HashMap<String, Object>(16);
        metric.put("listenConfigSize", String.valueOf(this.cacheMap.get().size()));
        metric.put("clientVersion", VersionUtils.getFullClientVersion());
        metric.put("snapshotDir", LocalConfigInfoProcessor.LOCAL_SNAPSHOT_PATH);
        metric.put("addressUrl", this.agent.serverListManager.getAddressSource());
        metric.put("isFixedServer", this.agent.serverListManager.isFixed());
        metric.put("serverUrls", this.agent.serverListManager.getUrlString());
        Map<ClientConfigMetricRequest.MetricsKey, Object> metricValues = this.getMetricsValue(metricsKeys);
        metric.put("metricValues", metricValues);
        HashMap<String, Object> metrics = new HashMap<String, Object>(1);
        metrics.put(this.uuid, JacksonUtils.toJson(metric));
        return metrics;
    }

    private Map<ClientConfigMetricRequest.MetricsKey, Object> getMetricsValue(List<ClientConfigMetricRequest.MetricsKey> metricsKeys) {
        if (metricsKeys == null) {
            return null;
        }
        HashMap<ClientConfigMetricRequest.MetricsKey, Object> values = new HashMap<ClientConfigMetricRequest.MetricsKey, Object>(16);
        for (ClientConfigMetricRequest.MetricsKey metricsKey : metricsKeys) {
            if ("cacheData".equals(metricsKey.getType())) {
                CacheData cacheData = this.cacheMap.get().get(metricsKey.getKey());
                values.putIfAbsent(metricsKey, cacheData == null ? null : cacheData.getContent() + ":" + cacheData.getMd5());
            }
            if (!"snapshotData".equals(metricsKey.getType())) continue;
            String[] configStr = GroupKey.parseKey(metricsKey.getKey());
            String snapshot = LocalConfigInfoProcessor.getSnapshot(this.agent.getName(), configStr[0], configStr[1], configStr[2]);
            values.putIfAbsent(metricsKey, snapshot == null ? null : snapshot + ":" + MD5Utils.md5Hex((String)snapshot, (String)"UTF-8"));
        }
        return values;
    }

    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        LOGGER.info("{} do shutdown begin", (Object)className);
        if (this.agent != null) {
            this.agent.shutdown();
        }
        LOGGER.info("{} do shutdown stop", (Object)className);
    }

    public boolean isHealthServer() {
        return this.agent.isHealthServer();
    }

    public String getAgentName() {
        return this.agent.getName();
    }

    public ConfigTransportClient getAgent() {
        return this.agent;
    }

    public class ConfigRpcTransportClient
    extends ConfigTransportClient {
        Map<String, ExecutorService> multiTaskExecutor;
        private final BlockingQueue<Object> listenExecutebell;
        private final Object bellItem;
        private long lastAllSyncTime;
        Subscriber subscriber;
        private static final long ALL_SYNC_INTERNAL = 180000L;

        public ConfigRpcTransportClient(NacosClientProperties properties, ConfigServerListManager serverListManager) {
            super(properties, serverListManager);
            this.multiTaskExecutor = new HashMap<String, ExecutorService>();
            this.listenExecutebell = new ArrayBlockingQueue<Object>(1);
            this.bellItem = new Object();
            this.lastAllSyncTime = System.currentTimeMillis();
            this.subscriber = null;
        }

        private ConnectionType getConnectionType() {
            return ConnectionType.GRPC;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void shutdown() throws NacosException {
            super.shutdown();
            Set set = RpcClientFactory.getAllClientEntries();
            synchronized (set) {
                LOGGER.info("Trying to shutdown transport client {}", (Object)this);
                Set allClientEntries = RpcClientFactory.getAllClientEntries();
                Iterator iterator = allClientEntries.iterator();
                while (iterator.hasNext()) {
                    Map.Entry entry = (Map.Entry)iterator.next();
                    if (!((String)entry.getKey()).startsWith(ClientWorker.this.uuid)) continue;
                    LOGGER.info("Trying to shutdown rpc client {}", entry.getKey());
                    try {
                        ((RpcClient)entry.getValue()).shutdown();
                    }
                    catch (NacosException nacosException) {
                        nacosException.printStackTrace();
                    }
                    LOGGER.info("Remove rpc client {}", entry.getKey());
                    iterator.remove();
                }
                LOGGER.info("Shutdown executor {}", (Object)this.executor);
                this.executor.shutdown();
                Map stringCacheDataMap = (Map)ClientWorker.this.cacheMap.get();
                for (Map.Entry entry : stringCacheDataMap.entrySet()) {
                    ((CacheData)entry.getValue()).setConsistentWithServer(false);
                }
                if (this.subscriber != null) {
                    NotifyCenter.deregisterSubscriber((Subscriber)this.subscriber);
                }
            }
        }

        private Map<String, String> getLabels() {
            HashMap<String, String> labels = new HashMap<String, String>(2, 1.0f);
            labels.put("source", "sdk");
            labels.put("module", "config");
            labels.put("AppName", AppNameUtils.getAppName());
            if (EnvUtil.getSelfVipserverTag() != null) {
                labels.put("Vipserver-Tag", EnvUtil.getSelfVipserverTag());
            }
            if (EnvUtil.getSelfAmoryTag() != null) {
                labels.put("Amory-Tag", EnvUtil.getSelfAmoryTag());
            }
            if (EnvUtil.getSelfLocationTag() != null) {
                labels.put("Location-Tag", EnvUtil.getSelfLocationTag());
            }
            labels.putAll(ClientWorker.this.appLables);
            return labels;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        ConfigChangeNotifyResponse handleConfigChangeNotifyRequest(ConfigChangeNotifyRequest configChangeNotifyRequest, String clientName) {
            LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}", new Object[]{clientName, configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant()});
            String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
            CacheData cacheData = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(groupKey);
            if (cacheData != null) {
                CacheData cacheData2 = cacheData;
                synchronized (cacheData2) {
                    cacheData.getReceiveNotifyChanged().set(true);
                    cacheData.setConsistentWithServer(false);
                    this.notifyListenConfig();
                }
            }
            return new ConfigChangeNotifyResponse();
        }

        ClientConfigMetricResponse handleClientMetricsRequest(ClientConfigMetricRequest configMetricRequest) {
            ClientConfigMetricResponse response = new ClientConfigMetricResponse();
            response.setMetrics(ClientWorker.this.getMetrics(configMetricRequest.getMetricsKeys()));
            return response;
        }

        private void initRpcClientHandler(final RpcClient rpcClientInner) {
            rpcClientInner.registerServerRequestHandler((request, connection) -> {
                if (request instanceof ConfigChangeNotifyRequest) {
                    return this.handleConfigChangeNotifyRequest((ConfigChangeNotifyRequest)request, rpcClientInner.getName());
                }
                return null;
            });
            rpcClientInner.registerServerRequestHandler((request, connection) -> {
                if (request instanceof ClientConfigMetricRequest) {
                    return this.handleClientMetricsRequest((ClientConfigMetricRequest)request);
                }
                return null;
            });
            rpcClientInner.registerConnectionListener(new ConnectionEventListener(){

                public void onConnected(Connection connection) {
                    LOGGER.info("[{}] Connected,notify listen context...", (Object)rpcClientInner.getName());
                    ConfigRpcTransportClient.this.notifyListenConfig();
                }

                public void onDisConnect(Connection connection) {
                    String taskId = (String)rpcClientInner.getLabels().get("taskId");
                    LOGGER.info("[{}] DisConnected,clear listen context...", (Object)rpcClientInner.getName());
                    Collection values = ((Map)ClientWorker.this.cacheMap.get()).values();
                    for (CacheData cacheData : values) {
                        if (StringUtils.isNotBlank((String)taskId)) {
                            if (!Integer.valueOf(taskId).equals(cacheData.getTaskId())) continue;
                            cacheData.setConsistentWithServer(false);
                            continue;
                        }
                        cacheData.setConsistentWithServer(false);
                    }
                }
            });
            rpcClientInner.serverListFactory(new ServerListFactory(){

                public String genNextServer() {
                    return ConfigRpcTransportClient.this.serverListManager.genNextServer();
                }

                public String getCurrentServer() {
                    return ConfigRpcTransportClient.this.serverListManager.getCurrentServer();
                }

                public List<String> getServerList() {
                    return ConfigRpcTransportClient.this.serverListManager.getServerList();
                }
            });
            this.subscriber = new Subscriber(){

                public void onEvent(Event event) {
                    rpcClientInner.onServerListChange();
                }

                public Class<? extends Event> subscribeType() {
                    return ServerListChangeEvent.class;
                }
            };
            NotifyCenter.registerSubscriber((Subscriber)this.subscriber);
        }

        @Override
        public void startInternal() {
            this.executor.schedule(() -> {
                while (!this.executor.isShutdown() && !this.executor.isTerminated()) {
                    try {
                        this.listenExecutebell.poll(5L, TimeUnit.SECONDS);
                        if (this.executor.isShutdown() || this.executor.isTerminated()) continue;
                        this.executeConfigListen();
                    }
                    catch (Throwable e) {
                        LOGGER.error("[rpc listen execute] [rpc listen] exception", e);
                        try {
                            Thread.sleep(50L);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                        this.notifyListenConfig();
                    }
                }
            }, 0L, TimeUnit.MILLISECONDS);
        }

        @Override
        public String getName() {
            return this.serverListManager.getName();
        }

        @Override
        public void notifyListenConfig() {
            this.listenExecutebell.offer(this.bellItem);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void executeConfigListen() throws NacosException {
            HashMap<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
            HashMap<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
            long now = System.currentTimeMillis();
            boolean needAllSync = now - this.lastAllSyncTime >= 180000L;
            Iterator iterator = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();
            while (iterator.hasNext()) {
                CacheData cache;
                CacheData cacheData = cache = (CacheData)iterator.next();
                synchronized (cacheData) {
                    List cacheDatas;
                    this.checkLocalConfig(cache);
                    if (cache.isConsistentWithServer()) {
                        cache.checkListenerMd5();
                        if (!needAllSync) {
                            continue;
                        }
                    }
                    if (cache.isUseLocalConfigInfo()) {
                        continue;
                    }
                    if (!cache.isDiscard()) {
                        cacheDatas = listenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()), k -> new LinkedList());
                        cacheDatas.add(cache);
                    } else {
                        cacheDatas = removeListenCachesMap.computeIfAbsent(String.valueOf(cache.getTaskId()), k -> new LinkedList());
                        cacheDatas.add(cache);
                    }
                }
            }
            boolean hasChangedKeys = this.checkListenCache(listenCachesMap);
            this.checkRemoveListenCache(removeListenCachesMap);
            if (needAllSync) {
                this.lastAllSyncTime = now;
            }
            if (hasChangedKeys) {
                this.notifyListenConfig();
            }
        }

        public void checkLocalConfig(CacheData cacheData) {
            String dataId = cacheData.dataId;
            String group = cacheData.group;
            String tenant = cacheData.tenant;
            String envName = cacheData.envName;
            File file = LocalConfigInfoProcessor.getFailoverFile(envName, dataId, group, tenant);
            if (!cacheData.isUseLocalConfigInfo() && file.exists()) {
                String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant);
                String md5 = MD5Utils.md5Hex((String)content, (String)"UTF-8");
                cacheData.setUseLocalConfigInfo(true);
                cacheData.setLocalConfigInfoVersion(file.lastModified());
                cacheData.setContent(content);
                LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}", new Object[]{envName, dataId, group, tenant, md5});
                return;
            }
            if (cacheData.isUseLocalConfigInfo() && !file.exists()) {
                cacheData.setUseLocalConfigInfo(false);
                LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", new Object[]{envName, dataId, group, tenant});
                return;
            }
            if (cacheData.isUseLocalConfigInfo() && file.exists() && cacheData.getLocalConfigInfoVersion() != file.lastModified()) {
                String content = LocalConfigInfoProcessor.getFailover(envName, dataId, group, tenant);
                String md5 = MD5Utils.md5Hex((String)content, (String)"UTF-8");
                cacheData.setUseLocalConfigInfo(true);
                cacheData.setLocalConfigInfoVersion(file.lastModified());
                cacheData.setContent(content);
                LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}", new Object[]{envName, dataId, group, tenant, md5});
            }
        }

        private ExecutorService ensureSyncExecutor(String taskId) {
            if (!this.multiTaskExecutor.containsKey(taskId)) {
                this.multiTaskExecutor.put(taskId, new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), r -> {
                    Thread thread = new Thread(r, "nacos.client.config.listener.task-" + taskId);
                    thread.setDaemon(true);
                    return thread;
                }));
            }
            return this.multiTaskExecutor.get(taskId);
        }

        private void refreshContentAndCheck(RpcClient rpcClient, String groupKey, boolean notify) {
            if (ClientWorker.this.cacheMap.get() != null && ((Map)ClientWorker.this.cacheMap.get()).containsKey(groupKey)) {
                CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(groupKey);
                this.refreshContentAndCheck(rpcClient, cache, notify);
            }
        }

        private void refreshContentAndCheck(RpcClient rpcClient, CacheData cacheData, boolean notify) {
            try {
                ConfigResponse response = this.queryConfigInner(rpcClient, cacheData.dataId, cacheData.group, cacheData.tenant, ClientWorker.this.requestTimeout, notify);
                cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
                cacheData.setContent(response.getContent());
                if (null != response.getConfigType()) {
                    cacheData.setType(response.getConfigType());
                }
                if (notify) {
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, type={}", new Object[]{ClientWorker.this.agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(), response.getConfigType()});
                }
                cacheData.checkListenerMd5();
            }
            catch (Exception e) {
                LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", new Object[]{cacheData.dataId, cacheData.group, cacheData.tenant, e});
            }
        }

        private void checkRemoveListenCache(Map<String, List<CacheData>> removeListenCachesMap) throws NacosException {
            if (!removeListenCachesMap.isEmpty()) {
                ArrayList listenFutures = new ArrayList();
                for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
                    String taskId = entry.getKey();
                    RpcClient rpcClient = this.ensureRpcClient(taskId);
                    ExecutorService executorService = this.ensureSyncExecutor(taskId);
                    Future<?> future = executorService.submit(() -> {
                        List removeListenCaches = (List)entry.getValue();
                        ConfigBatchListenRequest configChangeListenRequest = this.buildConfigRequest(removeListenCaches);
                        configChangeListenRequest.setListen(false);
                        try {
                            boolean removeSuccess = this.unListenConfigChange(rpcClient, configChangeListenRequest);
                            if (!removeSuccess) return;
                            Iterator iterator = removeListenCaches.iterator();
                            while (iterator.hasNext()) {
                                CacheData cacheData;
                                CacheData cacheData2 = cacheData = (CacheData)iterator.next();
                                synchronized (cacheData2) {
                                    if (cacheData.isDiscard() && cacheData.getListeners().isEmpty()) {
                                        ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
                                    }
                                }
                            }
                            return;
                        }
                        catch (Throwable e) {
                            LOGGER.error("Async remove listen config change error ", e);
                            try {
                                Thread.sleep(50L);
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                            this.notifyListenConfig();
                        }
                    });
                    listenFutures.add(future);
                }
                for (Future future : listenFutures) {
                    try {
                        future.get();
                    }
                    catch (Throwable throwable) {
                        LOGGER.error("Async remove listen config change error ", throwable);
                    }
                }
            }
        }

        private boolean checkListenCache(Map<String, List<CacheData>> listenCachesMap) throws NacosException {
            AtomicBoolean hasChangedKeys = new AtomicBoolean(false);
            if (!listenCachesMap.isEmpty()) {
                ArrayList listenFutures = new ArrayList();
                for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
                    String taskId = entry.getKey();
                    RpcClient rpcClient = this.ensureRpcClient(taskId);
                    ExecutorService executorService = this.ensureSyncExecutor(taskId);
                    Future<?> future = executorService.submit(() -> {
                        List listenCaches = (List)entry.getValue();
                        for (CacheData cacheData : listenCaches) {
                            cacheData.getReceiveNotifyChanged().set(false);
                        }
                        ConfigBatchListenRequest configChangeListenRequest = this.buildConfigRequest(listenCaches);
                        configChangeListenRequest.setListen(true);
                        try {
                            boolean isInitializing;
                            String changeKey;
                            ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse)this.requestProxy(rpcClient, (Request)configChangeListenRequest);
                            if (listenResponse == null || !listenResponse.isSuccess()) return;
                            HashSet<String> changeKeys = new HashSet<String>();
                            List changedConfigs = listenResponse.getChangedConfigs();
                            if (!CollectionUtils.isEmpty(changedConfigs)) {
                                hasChangedKeys.set(true);
                                for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : changedConfigs) {
                                    changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
                                    changeKeys.add(changeKey);
                                    isInitializing = ((CacheData)((Map)ClientWorker.this.cacheMap.get()).get(changeKey)).isInitializing();
                                    this.refreshContentAndCheck(rpcClient, changeKey, !isInitializing);
                                }
                            }
                            for (CacheData cacheData : listenCaches) {
                                if (!cacheData.getReceiveNotifyChanged().get() || changeKeys.contains(changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant()))) continue;
                                isInitializing = ((CacheData)((Map)ClientWorker.this.cacheMap.get()).get(changeKey)).isInitializing();
                                this.refreshContentAndCheck(rpcClient, changeKey, !isInitializing);
                            }
                            for (CacheData cacheData : listenCaches) {
                                cacheData.setInitializing(false);
                                String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
                                if (changeKeys.contains(groupKey)) continue;
                                CacheData cacheData2 = cacheData;
                                synchronized (cacheData2) {
                                    if (!cacheData.getReceiveNotifyChanged().get()) {
                                        cacheData.setConsistentWithServer(true);
                                    }
                                }
                            }
                            return;
                        }
                        catch (Throwable e) {
                            LOGGER.error("Execute listen config change error ", e);
                            try {
                                Thread.sleep(50L);
                            }
                            catch (InterruptedException interruptedException) {
                                // empty catch block
                            }
                            this.notifyListenConfig();
                        }
                    });
                    listenFutures.add(future);
                }
                for (Future future : listenFutures) {
                    try {
                        future.get();
                    }
                    catch (Throwable throwable) {
                        LOGGER.error("Async listen config change error ", throwable);
                    }
                }
            }
            return hasChangedKeys.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private RpcClient ensureRpcClient(String taskId) throws NacosException {
            ClientWorker clientWorker = ClientWorker.this;
            synchronized (clientWorker) {
                Map<String, String> labels = this.getLabels();
                HashMap<String, String> newLabels = new HashMap<String, String>(labels);
                newLabels.put("taskId", taskId);
                RpcClientTlsConfig clientTlsConfig = RpcClientTlsConfigFactory.getInstance().createSdkConfig(this.properties);
                RpcClient rpcClient = RpcClientFactory.createClient((String)(ClientWorker.this.uuid + "_config-" + taskId), (ConnectionType)this.getConnectionType(), newLabels, (RpcClientTlsConfig)clientTlsConfig);
                if (rpcClient.isWaitInitiated()) {
                    this.initRpcClientHandler(rpcClient);
                    rpcClient.setTenant(this.getTenant());
                    rpcClient.start();
                }
                return rpcClient;
            }
        }

        private ConfigBatchListenRequest buildConfigRequest(List<CacheData> caches) {
            ConfigBatchListenRequest configChangeListenRequest = new ConfigBatchListenRequest();
            for (CacheData cacheData : caches) {
                configChangeListenRequest.addConfigListenContext(cacheData.group, cacheData.dataId, cacheData.tenant, cacheData.getMd5());
            }
            return configChangeListenRequest;
        }

        @Override
        public void removeCache(String dataId, String group) {
            this.notifyListenConfig();
        }

        private boolean unListenConfigChange(RpcClient rpcClient, ConfigBatchListenRequest configChangeListenRequest) throws NacosException {
            ConfigChangeBatchListenResponse response = (ConfigChangeBatchListenResponse)this.requestProxy(rpcClient, (Request)configChangeListenRequest);
            return response.isSuccess();
        }

        @Override
        public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify) throws NacosException {
            CacheData cacheData;
            RpcClient rpcClient = this.getOneRunningClient();
            if (notify && (cacheData = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant))) != null) {
                rpcClient = this.ensureRpcClient(String.valueOf(cacheData.getTaskId()));
            }
            return this.queryConfigInner(rpcClient, dataId, group, tenant, readTimeouts, notify);
        }

        ConfigResponse queryConfigInner(RpcClient rpcClient, String dataId, String group, String tenant, long readTimeouts, boolean notify) throws NacosException {
            ConfigQueryRequest request = ConfigQueryRequest.build((String)dataId, (String)group, (String)tenant);
            request.putHeader(ClientWorker.NOTIFY_HEADER, String.valueOf(notify));
            ConfigQueryResponse response = (ConfigQueryResponse)this.requestProxy(rpcClient, (Request)request, readTimeouts);
            ConfigResponse configResponse = new ConfigResponse();
            if (response.isSuccess()) {
                LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
                configResponse.setContent(response.getContent());
                String configType = StringUtils.isNotBlank((String)response.getContentType()) ? response.getContentType() : ConfigType.TEXT.getType();
                configResponse.setConfigType(configType);
                String encryptedDataKey = response.getEncryptedDataKey();
                LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(ClientWorker.this.agent.getName(), dataId, group, tenant, encryptedDataKey);
                configResponse.setEncryptedDataKey(encryptedDataKey);
                return configResponse;
            }
            if (response.getErrorCode() == 300) {
                LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);
                LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(ClientWorker.this.agent.getName(), dataId, group, tenant, null);
                return configResponse;
            }
            if (response.getErrorCode() == 400) {
                LOGGER.error("[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}", new Object[]{this.getName(), dataId, group, tenant});
                throw new NacosException(409, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
            }
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", new Object[]{this.getName(), dataId, group, tenant, response});
            throw new NacosException(response.getErrorCode(), "http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        }

        private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException {
            return this.requestProxy(rpcClientInner, request, ClientWorker.this.requestTimeout);
        }

        private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills) throws NacosException {
            try {
                request.putAllHeader(super.getSecurityHeaders(this.resourceBuild(request)));
                request.putAllHeader(super.getCommonHeader());
            }
            catch (Exception e) {
                throw new NacosException(-400, (Throwable)e);
            }
            JsonObject asJsonObjectTemp = new Gson().toJsonTree((Object)request).getAsJsonObject();
            asJsonObjectTemp.remove("headers");
            asJsonObjectTemp.remove("requestId");
            boolean limit = Limiter.isLimit(request.getClass() + asJsonObjectTemp.toString());
            if (limit) {
                throw new NacosException(-503, "More than client-side current limit threshold");
            }
            Response response = timeoutMills < 0L ? rpcClientInner.request(request) : rpcClientInner.request(request, timeoutMills);
            if (response.getErrorCode() == 403) {
                this.reLogin();
            }
            return response;
        }

        private RequestResource resourceBuild(Request request) {
            if (request instanceof ConfigQueryRequest) {
                String tenant = ((ConfigQueryRequest)request).getTenant();
                String group = ((ConfigQueryRequest)request).getGroup();
                String dataId = ((ConfigQueryRequest)request).getDataId();
                return this.buildResource(tenant, group, dataId);
            }
            if (request instanceof ConfigPublishRequest) {
                String tenant = ((ConfigPublishRequest)request).getTenant();
                String group = ((ConfigPublishRequest)request).getGroup();
                String dataId = ((ConfigPublishRequest)request).getDataId();
                return this.buildResource(tenant, group, dataId);
            }
            if (request instanceof ConfigRemoveRequest) {
                String tenant = ((ConfigRemoveRequest)request).getTenant();
                String group = ((ConfigRemoveRequest)request).getGroup();
                String dataId = ((ConfigRemoveRequest)request).getDataId();
                return this.buildResource(tenant, group, dataId);
            }
            return RequestResource.configBuilder().build();
        }

        RpcClient getOneRunningClient() throws NacosException {
            return this.ensureRpcClient("0");
        }

        @Override
        public boolean publishConfig(String dataId, String group, String tenant, String appName, String tag, String betaIps, String content, String encryptedDataKey, String casMd5, String type) throws NacosException {
            try {
                ConfigPublishRequest request = new ConfigPublishRequest(dataId, group, tenant, content);
                request.setCasMd5(casMd5);
                request.putAdditionalParam(ClientWorker.TAG_PARAM, tag);
                request.putAdditionalParam(ClientWorker.APP_NAME_PARAM, appName);
                request.putAdditionalParam(ClientWorker.BETAIPS_PARAM, betaIps);
                request.putAdditionalParam(ClientWorker.TYPE_PARAM, type);
                request.putAdditionalParam(ClientWorker.ENCRYPTED_DATA_KEY_PARAM, encryptedDataKey == null ? "" : encryptedDataKey);
                ConfigPublishResponse response = (ConfigPublishResponse)this.requestProxy(this.getOneRunningClient(), (Request)request);
                if (!response.isSuccess()) {
                    LOGGER.warn("[{}] [publish-single] fail, dataId={}, group={}, tenant={}, code={}, msg={}", new Object[]{this.getName(), dataId, group, tenant, response.getErrorCode(), response.getMessage()});
                    return false;
                }
                LOGGER.info("[{}] [publish-single] ok, dataId={}, group={}, tenant={}", new Object[]{this.getName(), dataId, group, tenant});
                return true;
            }
            catch (Exception e) {
                LOGGER.warn("[{}] [publish-single] error, dataId={}, group={}, tenant={}, code={}, msg={}", new Object[]{this.getName(), dataId, group, tenant, "unknown", e.getMessage()});
                return false;
            }
        }

        @Override
        public boolean removeConfig(String dataId, String group, String tenant, String tag) throws NacosException {
            ConfigRemoveRequest request = new ConfigRemoveRequest(dataId, group, tenant, tag);
            ConfigRemoveResponse response = (ConfigRemoveResponse)this.requestProxy(this.getOneRunningClient(), (Request)request);
            return response.isSuccess();
        }

        public boolean isHealthServer() {
            try {
                return this.getOneRunningClient().isRunning();
            }
            catch (NacosException e) {
                LOGGER.warn("check server status failed.", (Throwable)e);
                return false;
            }
        }
    }
}

