/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.config.server.utils.PropertyUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.NodeState;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.utils.InetUtils;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AsyncNotifyService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);
    private static final int MIN_RETRY_INTERVAL = 500;
    private static final int INCREASE_STEPS = 1000;
    private static final int MAX_COUNT = 6;
    @Autowired
    private ConfigClusterRpcClientProxy configClusterRpcClientProxy;
    private ServerMemberManager memberManager;
    static final List<NodeState> HEALTHY_CHECK_STATUS = new ArrayList<NodeState>();

    @Autowired
    public AsyncNotifyService(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, (int)NotifyCenter.ringBufferSize);
        NotifyCenter.registerSubscriber((Subscriber)new Subscriber(){

            public void onEvent(Event event) {
                AsyncNotifyService.this.handleConfigDataChangeEvent(event);
            }

            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }

    void handleConfigDataChangeEvent(Event event) {
        if (event instanceof ConfigDataChangeEvent) {
            ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
            MetricsMonitor.incrementConfigChangeCount(evt.tenant, evt.group, evt.dataId);
            List ipList = this.memberManager.allMembersWithoutSelf();
            LinkedList<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
            for (Member member : ipList) {
                NotifySingleRpcTask notifySingleRpcTask = this.generateTask(evt, member);
                if (notifySingleRpcTask == null) continue;
                rpcQueue.add(notifySingleRpcTask);
            }
            if (!rpcQueue.isEmpty()) {
                ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
            }
        }
    }

    private NotifySingleRpcTask generateTask(ConfigDataChangeEvent configDataChangeEvent, Member member) {
        NotifySingleRpcTask task = new NotifySingleRpcTask(configDataChangeEvent.dataId, configDataChangeEvent.group, configDataChangeEvent.tenant, configDataChangeEvent.grayName, configDataChangeEvent.lastModifiedTs, member);
        if (PropertyUtil.isGrayCompatibleModel() && StringUtils.isNotBlank((String)configDataChangeEvent.grayName) && !member.getExtendInfo().getOrDefault("supportGrayModel", Boolean.FALSE).booleanValue()) {
            String underLine = "_";
            task.setBeta("beta".equals(configDataChangeEvent.grayName));
            if (configDataChangeEvent.grayName.startsWith("tag" + underLine)) {
                task.setTag(configDataChangeEvent.grayName.substring(configDataChangeEvent.grayName.indexOf("tag" + underLine) + 4));
            }
        }
        return task;
    }

    private boolean isUnHealthy(String targetIp) {
        return !this.memberManager.stateCheck(targetIp, HEALTHY_CHECK_STATUS);
    }

    void executeAsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
        while (!queue.isEmpty()) {
            NotifySingleRpcTask task = queue.poll();
            ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
            syncRequest.setDataId(task.getDataId());
            syncRequest.setTenant(task.getTenant());
            syncRequest.setGroup(task.getGroup());
            syncRequest.setLastModified(task.getLastModified());
            syncRequest.setGrayName(task.getGrayName());
            syncRequest.setBeta(task.isBeta());
            syncRequest.setTag(task.getTag());
            Member member = task.member;
            String event = AsyncNotifyService.getNotifyEvent(task);
            if (!this.memberManager.hasMember(member.getAddress())) continue;
            boolean unHealthNeedDelay = this.isUnHealthy(member.getAddress());
            if (unHealthNeedDelay) {
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), event, "unhealth", 0L, member.getAddress());
                this.asyncTaskExecute(task);
                continue;
            }
            try {
                this.configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(this, task));
            }
            catch (Exception e) {
                MetricsMonitor.getConfigNotifyException().increment();
                this.asyncTaskExecute(task);
            }
        }
    }

    private void asyncTaskExecute(NotifySingleRpcTask task) {
        int delay = AsyncNotifyService.getDelayTime(task);
        LinkedList<NotifySingleRpcTask> queue = new LinkedList<NotifySingleRpcTask>();
        queue.add(task);
        AsyncRpcTask asyncTask = new AsyncRpcTask(queue);
        ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
    }

    private static String getNotifyEvent(NotifySingleRpcTask task) {
        String event = "notify";
        if (task.isBeta()) {
            event = "notify-beta";
        } else if (!StringUtils.isBlank((CharSequence)task.tag)) {
            event = "notify-tag-" + task.tag;
        } else if (StringUtils.isNotBlank((String)task.grayName)) {
            event = "notify-" + task.grayName;
        }
        return event;
    }

    private static int getDelayTime(NotifySingleRpcTask task) {
        int failCount = task.getFailCount();
        int delay = 500 + failCount * failCount * 1000;
        if (failCount <= 6) {
            task.setFailCount(failCount + 1);
        }
        return delay;
    }

    static {
        HEALTHY_CHECK_STATUS.add(NodeState.UP);
        HEALTHY_CHECK_STATUS.add(NodeState.SUSPICIOUS);
    }

    public static class NotifySingleRpcTask
    extends AbstractDelayTask {
        private String dataId;
        private String group;
        private String tenant;
        private long lastModified;
        private int failCount;
        private Member member;
        private String grayName;
        @Deprecated
        private boolean isBeta;
        @Deprecated
        private String tag;

        public NotifySingleRpcTask(String dataId, String group, String tenant, String grayName, long lastModified, Member member) {
            this.dataId = dataId;
            this.group = group;
            this.tenant = tenant;
            this.lastModified = lastModified;
            this.member = member;
            this.grayName = grayName;
            this.setTaskInterval(3000L);
        }

        public boolean isBeta() {
            return this.isBeta;
        }

        public void setBeta(boolean beta) {
            this.isBeta = beta;
        }

        public String getTag() {
            return this.tag;
        }

        public void setTag(String tag) {
            this.tag = tag;
        }

        public String getGrayName() {
            return this.grayName;
        }

        public void setGrayName(String grayName) {
            this.grayName = grayName;
        }

        public String getDataId() {
            return this.dataId;
        }

        public String getGroup() {
            return this.group;
        }

        public int getFailCount() {
            return this.failCount;
        }

        public void setFailCount(int failCount) {
            this.failCount = failCount;
        }

        public long getLastModified() {
            return this.lastModified;
        }

        public void merge(AbstractDelayTask task) {
        }

        public String getTenant() {
            return this.tenant;
        }
    }

    public class AsyncRpcTask
    implements Runnable {
        private Queue<NotifySingleRpcTask> queue;

        public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            AsyncNotifyService.this.executeAsyncRpcTask(this.queue);
        }
    }

    public static class AsyncRpcNotifyCallBack
    implements RequestCallBack<ConfigChangeClusterSyncResponse> {
        private NotifySingleRpcTask task;
        AsyncNotifyService asyncNotifyService;

        public AsyncRpcNotifyCallBack(AsyncNotifyService asyncNotifyService, NotifySingleRpcTask task) {
            this.task = task;
            this.asyncNotifyService = asyncNotifyService;
        }

        public Executor getExecutor() {
            return ConfigExecutor.getConfigSubServiceExecutor();
        }

        public long getTimeout() {
            return 1000L;
        }

        public void onResponse(ConfigChangeClusterSyncResponse response) {
            String event = AsyncNotifyService.getNotifyEvent(this.task);
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            if (response.isSuccess()) {
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), event, "ok", delayed, this.task.member.getAddress());
            } else {
                LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), response.getErrorCode()});
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), event, "error", delayed, this.task.member.getAddress());
                this.asyncNotifyService.asyncTaskExecute(this.task);
                LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
                MetricsMonitor.getConfigNotifyException().increment();
            }
        }

        public void onException(Throwable ex) {
            String event = AsyncNotifyService.getNotifyEvent(this.task);
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), ex});
            ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), event, "exception", delayed, this.task.member.getAddress());
            this.asyncNotifyService.asyncTaskExecute(this.task);
            LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }
    }
}

