/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.remoting.dataserver.handler;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.NotifyDataSyncRequest;
import com.alipay.sofa.registry.common.model.dataserver.SyncDataRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.bolt.BoltChannel;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.event.AfterWorkingProcess;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.node.DataNodeStatus;
import com.alipay.sofa.registry.server.data.remoting.dataserver.GetSyncDataHandler;
import com.alipay.sofa.registry.server.data.remoting.dataserver.SyncDataCallback;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.alipay.sofa.registry.server.data.util.ThreadPoolExecutorDataServer;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.ParaCheckUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;

public class NotifyDataSyncHandler
extends AbstractClientHandler<NotifyDataSyncRequest>
implements AfterWorkingProcess {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotifyDataSyncHandler.class);
    @Autowired
    private DataServerConfig dataServerConfig;
    @Autowired
    private GetSyncDataHandler getSyncDataHandler;
    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;
    private Executor executor = ExecutorFactory.newFixedThreadPool(10, NotifyDataSyncHandler.class.getSimpleName());
    private ThreadPoolExecutor notifyExecutor;
    @Autowired
    private DataNodeStatus dataNodeStatus;
    @Autowired
    private DatumCache datumCache;
    private static final BlockingQueue<SyncDataRequestForWorking> noWorkQueue = new LinkedBlockingQueue<SyncDataRequestForWorking>();

    @Override
    public void checkParam(NotifyDataSyncRequest request) throws RuntimeException {
        ParaCheckUtil.checkNotBlank((String)request.getDataInfoId(), (String)"request.dataInfoId");
    }

    @Override
    public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
        Connection connection = ((BoltChannel)channel).getConnection();
        if (this.dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
            LOGGER.info("receive notifyDataSync request,but data server not working!");
            noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
            return CommonResponse.buildSuccessResponse();
        }
        this.executorRequest(connection, request);
        return CommonResponse.buildSuccessResponse();
    }

    private void executorRequest(Connection connection, NotifyDataSyncRequest request) {
        this.executor.execute(() -> this.fetchSyncData(connection, request));
    }

    protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) {
        String dataInfoId = request.getDataInfoId();
        String dataCenter = request.getDataCenter();
        Datum datum = this.datumCache.get(dataCenter, dataInfoId);
        Long version = datum == null ? null : Long.valueOf(datum.getVersion());
        Long requestVersion = request.getVersion();
        if (version == null || requestVersion == 0L || version < requestVersion) {
            LOGGER.info("[NotifyDataSyncProcessor] begin get sync data, currentVersion={},request={}", (Object)version, (Object)request);
            this.getSyncDataHandler.syncData(new SyncDataCallback(this.getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), this.dataChangeEventCenter));
        } else {
            LOGGER.info("[NotifyDataSyncHandler] not need to sync data, currentVersion={},request={}", (Object)version, (Object)request);
        }
    }

    @Override
    public void afterWorkingProcess() {
        try {
            while (!noWorkQueue.isEmpty()) {
                SyncDataRequestForWorking event = noWorkQueue.poll(1L, TimeUnit.SECONDS);
                if (event == null) continue;
                this.executorRequest(event.getConnection(), event.getRequest());
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("receive disconnect event after working interrupted!", (Throwable)e);
        }
    }

    @Override
    public int getOrder() {
        return 1;
    }

    public CommonResponse buildFailedResponse(String msg) {
        return CommonResponse.buildFailedResponse((String)msg);
    }

    @Override
    public Class interest() {
        return NotifyDataSyncRequest.class;
    }

    public Executor getExecutor() {
        if (this.notifyExecutor == null) {
            this.notifyExecutor = new ThreadPoolExecutorDataServer("NotifyDataSyncProcessorExecutor", this.dataServerConfig.getNotifyDataSyncExecutorMinPoolSize(), this.dataServerConfig.getNotifyDataSyncExecutorMaxPoolSize(), this.dataServerConfig.getNotifyDataSyncExecutorKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.dataServerConfig.getNotifyDataSyncExecutorQueueSize()), (ThreadFactory)new NamedThreadFactory("DataServer-NotifyDataSyncProcessor-executor", true));
        }
        return this.notifyExecutor;
    }

    public ChannelHandler.HandlerType getType() {
        return ChannelHandler.HandlerType.PROCESSER;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }

    private class SyncDataRequestForWorking {
        private final Connection connection;
        private final NotifyDataSyncRequest request;

        public SyncDataRequestForWorking(Connection connection, NotifyDataSyncRequest request) {
            this.connection = connection;
            this.request = request;
        }

        public Connection getConnection() {
            return this.connection;
        }

        public NotifyDataSyncRequest getRequest() {
            return this.request;
        }
    }
}

