/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.session.registry;

import com.alipay.sofa.registry.common.model.RenewDatumRequest;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.common.model.store.StoreData;
import com.alipay.sofa.registry.common.model.store.Subscriber;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.common.model.store.Watcher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.acceptor.WriteDataAcceptor;
import com.alipay.sofa.registry.server.session.acceptor.WriteDataRequest;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.filter.DataIdMatchStrategy;
import com.alipay.sofa.registry.server.session.node.NodeManager;
import com.alipay.sofa.registry.server.session.node.service.DataNodeService;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.renew.RenewService;
import com.alipay.sofa.registry.server.session.store.DataStore;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.Watchers;
import com.alipay.sofa.registry.server.session.strategy.SessionRegistryStrategy;
import com.alipay.sofa.registry.server.session.wrapper.Wrapper;
import com.alipay.sofa.registry.server.session.wrapper.WrapperInterceptorManager;
import com.alipay.sofa.registry.server.session.wrapper.WrapperInvocation;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Autowired;

public class SessionRegistry
implements Registry {
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionRegistry.class);
    private static final Logger TASK_LOGGER = LoggerFactory.getLogger(SessionRegistry.class, (String)"[Task]");
    private static final Logger RENEW_LOGGER = LoggerFactory.getLogger((String)"RENEW-LOGGER", (String)"[SessionRegistry]");
    @Autowired
    private Interests sessionInterests;
    @Autowired
    private Watchers sessionWatchers;
    @Autowired
    private DataStore sessionDataStore;
    @Autowired
    private DataNodeService dataNodeService;
    @Autowired
    private TaskListenerManager taskListenerManager;
    @Autowired
    private NodeManager dataNodeManager;
    @Autowired
    private SessionServerConfig sessionServerConfig;
    @Autowired
    private Exchange boltExchange;
    @Autowired
    private SessionRegistryStrategy sessionRegistryStrategy;
    @Autowired
    private WrapperInterceptorManager wrapperInterceptorManager;
    @Autowired
    private DataIdMatchStrategy dataIdMatchStrategy;
    @Autowired
    private RenewService renewService;
    @Autowired
    private WriteDataAcceptor writeDataAcceptor;
    private volatile boolean enableDataRenewSnapshot = true;

    public void register(final StoreData storeData) {
        WrapperInvocation<StoreData, Boolean> wrapperInvocation = new WrapperInvocation<StoreData, Boolean>(new Wrapper<StoreData, Boolean>(){

            @Override
            public Boolean call() {
                switch (storeData.getDataType()) {
                    case PUBLISHER: {
                        final Publisher publisher = (Publisher)storeData;
                        SessionRegistry.this.sessionDataStore.add(publisher);
                        SessionRegistry.this.writeDataAcceptor.accept(new WriteDataRequest(){

                            public Object getRequestBody() {
                                return publisher;
                            }

                            @Override
                            public WriteDataRequest.WriteDataRequestType getRequestType() {
                                return WriteDataRequest.WriteDataRequestType.PUBLISHER;
                            }

                            @Override
                            public String getConnectId() {
                                return publisher.getSourceAddress().getAddressString();
                            }

                            @Override
                            public String getDataServerIP() {
                                Object dataNode = SessionRegistry.this.dataNodeManager.getNode(publisher.getDataInfoId());
                                return dataNode.getNodeUrl().getIpAddress();
                            }
                        });
                        SessionRegistry.this.sessionRegistryStrategy.afterPublisherRegister(publisher);
                        break;
                    }
                    case SUBSCRIBER: {
                        Subscriber subscriber = (Subscriber)storeData;
                        SessionRegistry.this.sessionInterests.add(subscriber);
                        SessionRegistry.this.sessionRegistryStrategy.afterSubscriberRegister(subscriber);
                        break;
                    }
                    case WATCHER: {
                        Watcher watcher = (Watcher)storeData;
                        SessionRegistry.this.sessionWatchers.add(watcher);
                        SessionRegistry.this.sessionRegistryStrategy.afterWatcherRegister(watcher);
                        break;
                    }
                }
                return null;
            }

            @Override
            public Supplier<StoreData> getParameterSupplier() {
                return () -> storeData;
            }
        }, this.wrapperInterceptorManager);
        try {
            wrapperInvocation.proceed();
        }
        catch (Exception e) {
            throw new RuntimeException("Proceed register error!", e);
        }
    }

    @Override
    public void unRegister(StoreData<String> storeData) {
        switch (storeData.getDataType()) {
            case PUBLISHER: {
                final Publisher publisher = (Publisher)storeData;
                this.sessionDataStore.deleteById(storeData.getId(), publisher.getDataInfoId());
                this.writeDataAcceptor.accept(new WriteDataRequest(){

                    public Object getRequestBody() {
                        return publisher;
                    }

                    @Override
                    public WriteDataRequest.WriteDataRequestType getRequestType() {
                        return WriteDataRequest.WriteDataRequestType.UN_PUBLISHER;
                    }

                    @Override
                    public String getConnectId() {
                        return publisher.getSourceAddress().getAddressString();
                    }

                    @Override
                    public String getDataServerIP() {
                        Object dataNode = SessionRegistry.this.dataNodeManager.getNode(publisher.getDataInfoId());
                        return dataNode.getNodeUrl().getIpAddress();
                    }
                });
                this.sessionRegistryStrategy.afterPublisherUnRegister(publisher);
                break;
            }
            case SUBSCRIBER: {
                Subscriber subscriber = (Subscriber)storeData;
                this.sessionInterests.deleteById(storeData.getId(), subscriber.getDataInfoId());
                this.sessionRegistryStrategy.afterSubscriberUnRegister(subscriber);
                break;
            }
            case WATCHER: {
                Watcher watcher = (Watcher)storeData;
                this.sessionWatchers.deleteById(watcher.getId(), watcher.getDataInfoId());
                this.sessionRegistryStrategy.afterWatcherUnRegister(watcher);
                break;
            }
        }
    }

    @Override
    public void cancel(List<String> connectIds) {
        ArrayList<String> connectIdsWithPub = new ArrayList<String>();
        this.removeFromSession(connectIds, connectIdsWithPub);
        this.clientOffToDataNode(connectIdsWithPub);
    }

    private void removeFromSession(List<String> connectIds, List<String> connectIdsWithPub) {
        for (String connectId : connectIds) {
            if (this.sessionDataStore.deleteByConnectId(connectId)) {
                connectIdsWithPub.add(connectId);
            }
            this.sessionInterests.deleteByConnectId(connectId);
            this.sessionWatchers.deleteByConnectId(connectId);
        }
    }

    private void clientOffToDataNode(List<String> connectIdsWithPub) {
        for (final String connectId : connectIdsWithPub) {
            this.writeDataAcceptor.accept(new WriteDataRequest(){

                public Object getRequestBody() {
                    return connectId;
                }

                @Override
                public WriteDataRequest.WriteDataRequestType getRequestType() {
                    return WriteDataRequest.WriteDataRequestType.CLIENT_OFF;
                }

                @Override
                public String getConnectId() {
                    return connectId;
                }

                @Override
                public String getDataServerIP() {
                    return null;
                }
            });
            this.writeDataAcceptor.remove(connectId);
        }
    }

    @Override
    public void fetchChangData() {
        if (!this.sessionServerConfig.isBeginDataFetchTask()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500L);
            }
            catch (InterruptedException e) {
                LOGGER.error("fetchChangData task sleep InterruptedException", (Throwable)e);
            }
            return;
        }
        this.fetchChangDataProcess();
    }

    @Override
    public void fetchChangDataProcess() {
        ArrayList<String> checkDataInfoIds = new ArrayList<String>();
        this.sessionInterests.getInterestDataInfoIds().forEach(dataInfoId -> {
            Collection<Subscriber> subscribers = this.sessionInterests.getInterests((String)dataInfoId);
            if (subscribers != null && !subscribers.isEmpty()) {
                checkDataInfoIds.add((String)dataInfoId);
            }
        });
        LOGGER.info("[fetchChangDataProcess] Fetch data versions for {} dataInfoIds", (Object)checkDataInfoIds.size());
        Map<String, Collection<String>> map = this.calculateDataNode(checkDataInfoIds);
        map.forEach((address, dataInfoIds) -> {
            Map<String, Map<String, Long>> dataVersions = this.dataNodeService.fetchDataVersion(URL.valueOf((String)address), (Collection<String>)dataInfoIds);
            if (dataVersions != null) {
                this.sessionRegistryStrategy.doFetchChangDataProcess(dataVersions);
            } else {
                LOGGER.warn("Fetch no change data versions info from {}", address);
            }
        });
    }

    private Map<String, Collection<String>> calculateDataNode(Collection<String> dataInfoIds) {
        HashMap<String, Collection<String>> map = new HashMap<String, Collection<String>>();
        if (dataInfoIds != null) {
            dataInfoIds.forEach(dataInfoId -> {
                Object dataNode = this.dataNodeManager.getNode((String)dataInfoId);
                URL url = new URL(dataNode.getNodeUrl().getIpAddress(), this.sessionServerConfig.getDataServerPort());
                Collection list = map.computeIfAbsent(url.getAddressString(), k -> new ArrayList());
                list.add(dataInfoId);
            });
        }
        return map;
    }

    @Override
    public void remove(List<String> connectIds) {
        ArrayList connectIdsAll = new ArrayList();
        connectIds.forEach(connectId -> {
            Map pubMap = this.getSessionDataStore().queryByConnectId((String)connectId);
            boolean pubExisted = pubMap != null && !pubMap.isEmpty();
            Map subMap = this.getSessionInterests().queryByConnectId((String)connectId);
            boolean subExisted = false;
            if (subMap != null && !subMap.isEmpty()) {
                subExisted = true;
                subMap.forEach((registerId, sub) -> {
                    if (this.dataIdMatchStrategy.match(sub.getDataId(), () -> this.sessionServerConfig.getBlacklistSubDataIdRegex())) {
                        this.fireSubscriberPushEmptyTask((Subscriber)sub);
                    }
                });
            }
            if (pubExisted || subExisted) {
                connectIdsAll.add(connectId);
            }
        });
        if (!connectIds.isEmpty()) {
            TaskEvent taskEvent = new TaskEvent(connectIds, TaskEvent.TaskType.CANCEL_DATA_TASK);
            TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", (Object)taskEvent);
            this.getTaskListenerManager().sendTaskEvent(taskEvent);
        }
    }

    private void fireSubscriberPushEmptyTask(Subscriber subscriber) {
        TaskEvent taskEvent = new TaskEvent((Object)subscriber, TaskEvent.TaskType.SUBSCRIBER_PUSH_EMPTY_TASK);
        TASK_LOGGER.info("send " + taskEvent.getTaskType() + " taskEvent:{}", (Object)taskEvent);
        this.getTaskListenerManager().sendTaskEvent(taskEvent);
    }

    @Override
    public void cleanClientConnect() {
        Sets.SetView intersection = Sets.union(this.sessionDataStore.getConnectPublishers().keySet(), this.sessionInterests.getConnectSubscribers().keySet());
        Server sessionServer = this.boltExchange.getServer(Integer.valueOf(this.sessionServerConfig.getServerPort()));
        ArrayList<String> connectIds = new ArrayList<String>();
        for (String connectId : intersection) {
            Channel channel = sessionServer.getChannel(URL.valueOf((String)connectId));
            if (channel != null) continue;
            connectIds.add(connectId);
            LOGGER.warn("Client connect has not existed!it must be remove!connectId:{}", (Object)connectId);
        }
        if (!connectIds.isEmpty()) {
            this.cancel(connectIds);
        }
    }

    @Override
    public void renewDatum(final String connectId) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("renewDatum: connectId={}", (Object)connectId);
        }
        if (!this.enableDataRenewSnapshot) {
            return;
        }
        List<RenewDatumRequest> renewDatumRequests = this.renewService.getRenewDatumRequests(connectId);
        if (renewDatumRequests != null) {
            for (final RenewDatumRequest renewDatumRequest : renewDatumRequests) {
                this.writeDataAcceptor.accept(new WriteDataRequest(){

                    public Object getRequestBody() {
                        return renewDatumRequest;
                    }

                    @Override
                    public WriteDataRequest.WriteDataRequestType getRequestType() {
                        return WriteDataRequest.WriteDataRequestType.RENEW_DATUM;
                    }

                    @Override
                    public String getConnectId() {
                        return connectId;
                    }

                    @Override
                    public String getDataServerIP() {
                        return renewDatumRequest.getDataServerIP();
                    }
                });
            }
        }
    }

    @Override
    public void sendDatumSnapshot(final String connectId, final String dataServerIP) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("sendDatumSnapshot: connectId={}, dataServerIP={}", (Object)connectId, (Object)dataServerIP);
        }
        this.writeDataAcceptor.accept(new WriteDataRequest(){

            public Object getRequestBody() {
                return connectId;
            }

            @Override
            public WriteDataRequest.WriteDataRequestType getRequestType() {
                return WriteDataRequest.WriteDataRequestType.DATUM_SNAPSHOT;
            }

            @Override
            public String getConnectId() {
                return connectId;
            }

            @Override
            public String getDataServerIP() {
                return dataServerIP;
            }
        });
    }

    protected Interests getSessionInterests() {
        return this.sessionInterests;
    }

    protected DataStore getSessionDataStore() {
        return this.sessionDataStore;
    }

    protected TaskListenerManager getTaskListenerManager() {
        return this.taskListenerManager;
    }

    @Override
    public void setEnableDataRenewSnapshot(boolean enableDataRenewSnapshot) {
        this.enableDataRenewSnapshot = enableDataRenewSnapshot;
    }
}

