/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.change.event.ClientChangeEvent;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect.ClientDisconnectEvent;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect.DisconnectEvent;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect.DisconnectTypeEnum;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.disconnect.SessionServerDisconnectEvent;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

public class DisconnectEventHandler
implements InitializingBean,
AfterWorkingProcess {
    private static final Logger LOGGER = LoggerFactory.getLogger(DisconnectEventHandler.class);
    private static final Logger LOGGER_START = LoggerFactory.getLogger((String)"DATA-START-LOGS");
    private final DelayQueue<DisconnectEvent> EVENT_QUEUE = new DelayQueue();
    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;
    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;
    @Autowired
    private DataServerConfig dataServerConfig;
    @Autowired
    private DataNodeStatus dataNodeStatus;
    private static final int BLOCK_FOR_ALL_SYNC = 5000;
    private static final BlockingQueue<DisconnectEvent> noWorkQueue = new LinkedBlockingQueue<DisconnectEvent>();

    public void receive(DisconnectEvent event) {
        if (event.getType() == DisconnectTypeEnum.SESSION_SERVER) {
            SessionServerDisconnectEvent sessionServerDisconnectEvent = (SessionServerDisconnectEvent)event;
            LOGGER.info("receive session off event: sessionServerHost={}, processId={}", (Object)sessionServerDisconnectEvent.getSessionServerHost(), (Object)sessionServerDisconnectEvent.getProcessId());
        } else if (event.getType() == DisconnectTypeEnum.CLIENT) {
            ClientDisconnectEvent clientDisconnectEvent = (ClientDisconnectEvent)event;
            LOGGER.info("receive client off event: connectId={}", (Object)clientDisconnectEvent.getConnectId());
        }
        if (this.dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
            LOGGER.info("receive disconnect event,but data server not working!");
            noWorkQueue.add(event);
            return;
        }
        this.EVENT_QUEUE.add(event);
    }

    @Override
    public void afterWorkingProcess() {
        try {
            TimeUnit.MILLISECONDS.sleep(5000L);
            while (!noWorkQueue.isEmpty()) {
                DisconnectEvent event = noWorkQueue.poll(1L, TimeUnit.SECONDS);
                if (event == null) continue;
                this.receive(event);
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("receive disconnect event after working interrupted!", (Throwable)e);
        }
    }

    @Override
    public int getOrder() {
        return 0;
    }

    public void afterPropertiesSet() {
        Executor executor = ExecutorFactory.newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName());
        executor.execute(() -> {
            while (true) {
                try {
                    block3: while (true) {
                        DisconnectEvent event;
                        DisconnectEvent disconnectEvent;
                        if ((disconnectEvent = (DisconnectEvent)this.EVENT_QUEUE.take()).getType() == DisconnectTypeEnum.SESSION_SERVER) {
                            String sessionServerHost;
                            event = (SessionServerDisconnectEvent)disconnectEvent;
                            String processId = ((SessionServerDisconnectEvent)event).getProcessId();
                            if (this.sessionServerConnectionFactory.removeProcessIfMatch(processId, sessionServerHost = ((SessionServerDisconnectEvent)event).getSessionServerHost())) {
                                Set<String> connectIds = this.sessionServerConnectionFactory.removeConnectIds(processId);
                                LOGGER.info("session off is triggered: sessionServerHost={}, connectId={}, processId={}", new Object[]{sessionServerHost, connectIds, processId});
                                if (connectIds == null || connectIds.isEmpty()) continue;
                                Iterator<String> iterator = connectIds.iterator();
                                while (true) {
                                    if (!iterator.hasNext()) continue block3;
                                    String connectId = iterator.next();
                                    this.unPub(connectId, event.getRegisterTimestamp());
                                }
                            }
                            LOGGER.info("session off is canceled: sessionServerHost={}, processId={}", (Object)sessionServerHost, (Object)processId);
                            continue;
                        }
                        event = (ClientDisconnectEvent)disconnectEvent;
                        this.unPub(((ClientDisconnectEvent)event).getConnectId(), event.getRegisterTimestamp());
                    }
                }
                catch (Throwable e) {
                    LOGGER.error("handle client disconnect event failed", e);
                    continue;
                }
                break;
            }
        });
        LOGGER_START.info("start DisconnectEventHandler success");
    }

    private void unPub(String connectId, long registerTimestamp) {
        this.dataChangeEventCenter.onChange(new ClientChangeEvent(connectId, this.dataServerConfig.getLocalDataCenter(), registerTimestamp));
    }
}

