/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.session.scheduler.task;

import com.alipay.sofa.registry.common.model.metaserver.DataOperator;
import com.alipay.sofa.registry.common.model.metaserver.NotifyProvideDataChange;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.common.model.store.DataInfo;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.common.model.store.Watcher;
import com.alipay.sofa.registry.core.model.ReceivedConfigData;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.net.NetUtil;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter;
import com.alipay.sofa.registry.server.session.node.service.MetaNodeService;
import com.alipay.sofa.registry.server.session.provideData.ProvideDataProcessor;
import com.alipay.sofa.registry.server.session.scheduler.task.AbstractSessionTask;
import com.alipay.sofa.registry.server.session.store.Watchers;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ProvideDataChangeFetchTask
extends AbstractSessionTask {
    private static final Logger TASK_LOGGER = LoggerFactory.getLogger(ProvideDataChangeFetchTask.class, (String)"[Task]");
    private static final Logger LOGGER = LoggerFactory.getLogger(ProvideDataChangeFetchTask.class);
    private final SessionServerConfig sessionServerConfig;
    private final TaskListenerManager taskListenerManager;
    private final MetaNodeService metaNodeService;
    private final Watchers sessionWatchers;
    private final Exchange boltExchange;
    private NotifyProvideDataChange notifyProvideDataChange;
    private ProvideDataProcessor provideDataProcessorManager;

    public ProvideDataChangeFetchTask(SessionServerConfig sessionServerConfig, TaskListenerManager taskListenerManager, MetaNodeService metaNodeService, Watchers sessionWatchers, Exchange boltExchange, ProvideDataProcessor provideDataProcessorManager) {
        this.sessionServerConfig = sessionServerConfig;
        this.taskListenerManager = taskListenerManager;
        this.metaNodeService = metaNodeService;
        this.sessionWatchers = sessionWatchers;
        this.boltExchange = boltExchange;
        this.provideDataProcessorManager = provideDataProcessorManager;
    }

    public void setTaskEvent(TaskEvent taskEvent) {
        Object obj;
        if (taskEvent.getTaskId() != null) {
            this.setTaskId(taskEvent.getTaskId());
        }
        if (!((obj = taskEvent.getEventObj()) instanceof NotifyProvideDataChange)) {
            throw new IllegalArgumentException("Input task event object error!");
        }
        this.notifyProvideDataChange = (NotifyProvideDataChange)obj;
    }

    public void execute() {
        ProvideData provideData = null;
        String dataInfoId = this.notifyProvideDataChange.getDataInfoId();
        if (this.notifyProvideDataChange.getDataOperator() != DataOperator.REMOVE) {
            provideData = this.metaNodeService.fetchData(dataInfoId);
            if (provideData == null) {
                LOGGER.warn("Notify provider data Change request {} fetch no provider data!", (Object)this.notifyProvideDataChange);
                return;
            }
            this.provideDataProcessorManager.changeDataProcess(provideData);
        }
        DataInfo dataInfo = DataInfo.valueOf((String)dataInfoId);
        Server sessionServer = this.boltExchange.getServer(Integer.valueOf(this.sessionServerConfig.getServerPort()));
        if (sessionServer != null) {
            for (Channel channel : sessionServer.getChannels()) {
                String connectId = NetUtil.toAddressString((InetSocketAddress)channel.getRemoteAddress());
                Map<String, Watcher> map = this.getCache(connectId);
                ArrayList registerIds = new ArrayList();
                map.forEach((registerId, watchers) -> {
                    if (watchers != null && watchers.getDataInfoId().equals(dataInfoId)) {
                        registerIds.add(registerId);
                    }
                });
                if (registerIds.isEmpty()) continue;
                ReceivedConfigData receivedConfigData = this.notifyProvideDataChange.getDataOperator() == DataOperator.REMOVE ? ReceivedDataConverter.getReceivedConfigData(null, dataInfo, this.notifyProvideDataChange.getVersion()) : ReceivedDataConverter.getReceivedConfigData(provideData.getProvideData(), dataInfo, provideData.getVersion());
                receivedConfigData.setConfiguratorRegistIds(registerIds);
                this.firePushTask(receivedConfigData, new URL(channel.getRemoteAddress()));
            }
        }
    }

    private void firePushTask(ReceivedConfigData receivedConfigData, URL clientUrl) {
        HashMap<ReceivedConfigData, URL> parameter = new HashMap<ReceivedConfigData, URL>();
        parameter.put(receivedConfigData, clientUrl);
        TaskEvent taskEvent = new TaskEvent(parameter, TaskEvent.TaskType.RECEIVED_DATA_CONFIG_PUSH_TASK);
        TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", (Object)taskEvent);
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private Map<String, Watcher> getCache(String connectId) {
        ConcurrentHashMap map = this.sessionWatchers.queryByConnectId(connectId);
        return map == null ? new ConcurrentHashMap() : map;
    }

    public boolean checkRetryTimes() {
        return this.checkRetryTimes(this.sessionServerConfig.getSubscriberRegisterFetchRetryTimes());
    }

    public String toString() {
        return "PROVIDE_DATA_CHANGE_FETCH_TASK{taskId='" + this.getTaskId() + '\'' + ", notifyProvideDataChange=" + this.notifyProvideDataChange + ", retryTimes='" + this.sessionServerConfig.getSubscriberRegisterFetchRetryTimes() + '\'' + '}';
    }
}

