/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.session.scheduler.task;

import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.BaseInfo;
import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.core.model.ReceivedData;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.cache.CacheAccessException;
import com.alipay.sofa.registry.server.session.cache.CacheService;
import com.alipay.sofa.registry.server.session.cache.DatumKey;
import com.alipay.sofa.registry.server.session.cache.Key;
import com.alipay.sofa.registry.server.session.cache.Value;
import com.alipay.sofa.registry.server.session.converter.ReceivedDataConverter;
import com.alipay.sofa.registry.server.session.node.NodeManager;
import com.alipay.sofa.registry.server.session.node.NodeManagerFactory;
import com.alipay.sofa.registry.server.session.scheduler.ExecutorManager;
import com.alipay.sofa.registry.server.session.scheduler.task.AbstractSessionTask;
import com.alipay.sofa.registry.server.session.scheduler.task.PushTaskClosure;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.ReSubscribers;
import com.alipay.sofa.registry.task.TaskClosure;
import com.alipay.sofa.registry.task.batcher.TaskProcessor;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DataChangeFetchCloudTask
extends AbstractSessionTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeFetchCloudTask.class);
    private static final Logger taskLogger = LoggerFactory.getLogger(DataChangeFetchCloudTask.class, (String)"[Task]");
    private final SessionServerConfig sessionServerConfig;
    private Interests sessionInterests;
    private final TaskListenerManager taskListenerManager;
    private final ExecutorManager executorManager;
    private String fetchDataInfoId;
    private final CacheService sessionCacheService;

    public DataChangeFetchCloudTask(SessionServerConfig sessionServerConfig, TaskListenerManager taskListenerManager, Interests sessionInterests, ExecutorManager executorManager, CacheService sessionCacheService) {
        this.sessionServerConfig = sessionServerConfig;
        this.taskListenerManager = taskListenerManager;
        this.sessionInterests = sessionInterests;
        this.executorManager = executorManager;
        this.sessionCacheService = sessionCacheService;
    }

    @Override
    public long getExpiryTime() {
        return -1L;
    }

    public void setTaskEvent(TaskEvent taskEvent) {
        Object obj;
        if (taskEvent.getTaskId() != null) {
            this.setTaskId(taskEvent.getTaskId());
        }
        if (!((obj = taskEvent.getEventObj()) instanceof String)) {
            throw new IllegalArgumentException("Input task event object error!");
        }
        this.fetchDataInfoId = (String)obj;
    }

    public void execute() {
        Map<String, Datum> datumMap = this.getDatumsCache();
        if (datumMap != null && !datumMap.isEmpty()) {
            PushTaskClosure pushTaskClosure = this.getTaskClosure(datumMap);
            for (ScopeEnum scopeEnum : ScopeEnum.values()) {
                Map<InetSocketAddress, Map<String, Subscriber>> map = this.getCache(this.fetchDataInfoId, scopeEnum);
                if (map == null || map.isEmpty()) continue;
                for (Map.Entry<InetSocketAddress, Map<String, Subscriber>> entry : map.entrySet()) {
                    Map<String, Subscriber> subscriberMap = entry.getValue();
                    if (subscriberMap == null || subscriberMap.isEmpty()) continue;
                    ArrayList<String> subscriberRegisterIdList = new ArrayList<String>(subscriberMap.keySet());
                    Subscriber subscriber = subscriberMap.values().iterator().next();
                    this.evictReSubscribers(subscriberMap.values());
                    this.fireReceivedDataMultiPushTask(datumMap, subscriberRegisterIdList, scopeEnum, subscriber, subscriberMap, pushTaskClosure);
                }
            }
            pushTaskClosure.start();
        } else {
            LOGGER.error("Get publisher data error,which dataInfoId:{}", (Object)this.fetchDataInfoId);
        }
    }

    public PushTaskClosure getTaskClosure(Map<String, Datum> datumMap) {
        PushTaskClosure pushTaskClosure = new PushTaskClosure(this.executorManager.getPushTaskCheckAsyncHashedWheelTimer(), this.sessionServerConfig, this.fetchDataInfoId);
        pushTaskClosure.setTaskClosure((status, task) -> {
            if (status == TaskProcessor.ProcessingResult.Success) {
                if (this.sessionServerConfig.isStopPushSwitch()) {
                    LOGGER.info("Stop Push switch on,dataInfoId {} version can not be update!", (Object)this.fetchDataInfoId);
                    return;
                }
                List<String> dataCenters = this.sessionInterests.getDataCenters();
                datumMap.forEach((dataCenter, datum) -> {
                    dataCenters.remove(dataCenter);
                    String dataInfoId = this.fetchDataInfoId;
                    Long version = datum.getVersion();
                    boolean result = this.sessionInterests.checkAndUpdateInterestVersions((String)dataCenter, dataInfoId, version);
                    if (result) {
                        LOGGER.info("Push all tasks success,dataCenter:{} dataInfoId:{} version:{} update!", new Object[]{dataCenter, dataInfoId, version});
                    } else {
                        LOGGER.info("Push all tasks success,but dataCenter:{} dataInfoId:{} version:{} need not update!", new Object[]{dataCenter, dataInfoId, version});
                    }
                });
                dataCenters.forEach(dataCenter -> {
                    boolean result = this.sessionInterests.checkAndUpdateInterestVersionZero((String)dataCenter, this.fetchDataInfoId);
                    LOGGER.warn("Obtained datum from DataServer({}) failed, set sessionInterests dataInfoId({}) version zero, return {}", new Object[]{dataCenter, this.fetchDataInfoId, result});
                });
            } else {
                LOGGER.warn("Push tasks found error,subscribers version can not be update!dataInfoId={}", (Object)this.fetchDataInfoId);
            }
        });
        return pushTaskClosure;
    }

    private void evictReSubscribers(Collection<Subscriber> subscribersPush) {
        if (this.sessionInterests instanceof ReSubscribers) {
            ReSubscribers reSubscribers = (ReSubscribers)((Object)this.sessionInterests);
            subscribersPush.forEach(reSubscribers::deleteReSubscriber);
        }
    }

    private Map<InetSocketAddress, Map<String, Subscriber>> getCache(String dataInfoId, ScopeEnum scopeEnum) {
        return this.sessionInterests.querySubscriberIndex(dataInfoId, scopeEnum);
    }

    private Map<String, Datum> getDatumsCache() {
        HashMap<String, Datum> map = new HashMap<String, Datum>();
        NodeManager nodeManager = NodeManagerFactory.getNodeManager(Node.NodeType.META);
        Collection<String> dataCenters = nodeManager.getDataCenters();
        if (dataCenters != null) {
            Collection keys = dataCenters.stream().map(dataCenter -> new Key(Key.KeyType.OBJ, DatumKey.class.getName(), new DatumKey(this.fetchDataInfoId, (String)dataCenter))).collect(Collectors.toList());
            Map<Key, Value> values = null;
            try {
                values = this.sessionCacheService.getValues(keys);
            }
            catch (CacheAccessException e) {
                LOGGER.error(String.format("error when access cache: %s", e.getMessage()), (Throwable)e);
            }
            if (values != null) {
                values.forEach((key, value) -> {
                    if (value != null && value.getPayload() != null) {
                        map.put(((DatumKey)key.getEntityType()).getDataCenter(), (Datum)value.getPayload());
                    }
                });
            }
        }
        return map;
    }

    private void fireReceivedDataMultiPushTask(Map<String, Datum> datums, List<String> subscriberRegisterIdList, ScopeEnum scopeEnum, Subscriber subscriber, Map<String, Subscriber> subscriberMap, PushTaskClosure pushTaskClosure) {
        boolean isOldVersion;
        boolean bl = isOldVersion = !BaseInfo.ClientVersion.StoreData.equals((Object)subscriber.getClientVersion());
        if (!isOldVersion) {
            this.fireReceiveDataPushTask(datums, subscriberRegisterIdList, scopeEnum, subscriber, subscriberMap, pushTaskClosure);
        } else if (subscriber.getScope() == ScopeEnum.zone) {
            this.fireUserDataElementPushTask(ReceivedDataConverter.getMergeDatum(datums), subscriber, subscriberMap, pushTaskClosure);
        } else {
            this.fireUserDataElementMultiPushTask(ReceivedDataConverter.getMergeDatum(datums), subscriber, subscriberMap, pushTaskClosure);
        }
    }

    private void fireReceiveDataPushTask(Map<String, Datum> datums, List<String> subscriberRegisterIdList, ScopeEnum scopeEnum, Subscriber subscriber, Map<String, Subscriber> subscriberMap, PushTaskClosure pushTaskClosure) {
        ArrayList<Subscriber> subscribers = new ArrayList<Subscriber>(subscriberMap.values());
        LOGGER.info("Datums push={}", datums);
        ReceivedData receivedData = ReceivedDataConverter.getReceivedDataMulti(datums, scopeEnum, subscriberRegisterIdList, subscriber);
        HashMap<ReceivedData, URL> parameter = new HashMap<ReceivedData, URL>();
        parameter.put(receivedData, subscriber.getSourceAddress());
        TaskEvent taskEvent = new TaskEvent(parameter, TaskEvent.TaskType.RECEIVED_DATA_MULTI_PUSH_TASK);
        taskEvent.setTaskClosure((TaskClosure)pushTaskClosure);
        taskEvent.setAttribute("PUSH_CLIENT_SUBSCRIBERS", subscribers);
        taskLogger.info("send {} taskURL:{},taskScope:{},version:{},taskId={}", new Object[]{taskEvent.getTaskType(), subscriber.getSourceAddress(), scopeEnum, receivedData.getVersion(), taskEvent.getTaskId()});
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private void fireUserDataElementPushTask(Datum datum, Subscriber subscriber, Map<String, Subscriber> subscriberMap, PushTaskClosure pushTaskClosure) {
        ArrayList<Subscriber> subscribers = new ArrayList<Subscriber>(subscriberMap.values());
        TaskEvent taskEvent = new TaskEvent(TaskEvent.TaskType.USER_DATA_ELEMENT_PUSH_TASK);
        taskEvent.setTaskClosure((TaskClosure)pushTaskClosure);
        taskEvent.setAttribute("PUSH_CLIENT_SUBSCRIBERS", subscribers);
        taskEvent.setAttribute("PUSH_CLIENT_DATUM", (Object)datum);
        taskEvent.setAttribute("PUSH_CLIENT_URL", (Object)subscriber.getSourceAddress());
        int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0;
        taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", new Object[]{taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), datum.getDataCenter(), size, subscribers.size(), taskEvent.getTaskId()});
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private void fireUserDataElementMultiPushTask(Datum datum, Subscriber subscriber, Map<String, Subscriber> subscriberMap, PushTaskClosure pushTaskClosure) {
        ArrayList<Subscriber> subscribers = new ArrayList<Subscriber>(subscriberMap.values());
        TaskEvent taskEvent = new TaskEvent(TaskEvent.TaskType.USER_DATA_ELEMENT_MULTI_PUSH_TASK);
        taskEvent.setTaskClosure((TaskClosure)pushTaskClosure);
        taskEvent.setAttribute("PUSH_CLIENT_SUBSCRIBERS", subscribers);
        taskEvent.setAttribute("PUSH_CLIENT_DATUM", (Object)datum);
        taskEvent.setAttribute("PUSH_CLIENT_URL", (Object)subscriber.getSourceAddress());
        int size = datum != null && datum.getPubMap() != null ? datum.getPubMap().size() : 0;
        taskLogger.info("send {} taskURL:{},dataInfoId={},dataCenter={},pubSize={},subSize={},taskId={}", new Object[]{taskEvent.getTaskType(), subscriber.getSourceAddress(), datum.getDataInfoId(), datum.getDataCenter(), size, subscribers.size(), taskEvent.getTaskId()});
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    public boolean checkRetryTimes() {
        return this.checkRetryTimes(this.sessionServerConfig.getDataChangeFetchTaskRetryTimes());
    }

    public String toString() {
        return "DATA_CHANGE_FETCH_CLOUD_TASK{taskId='" + this.getTaskId() + '\'' + ", fetchDataInfoId=" + this.fetchDataInfoId + ", expiryTime='" + this.getExpiryTime() + '\'' + '}';
    }
}

