/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.change.notify;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.sessionserver.DataChangeRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.CallbackHandler;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.notify.IDataChangeNotifier;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory;
import com.alipay.sofa.registry.timer.AsyncHashedWheelTimer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;

public class SessionServerNotifier
implements IDataChangeNotifier {
    private static final Logger LOGGER = LoggerFactory.getLogger(SessionServerNotifier.class);
    private AsyncHashedWheelTimer asyncHashedWheelTimer;
    @Autowired
    private DataServerConfig dataServerConfig;
    @Autowired
    private Exchange boltExchange;
    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;
    @Autowired
    private DatumCache datumCache;

    @PostConstruct
    public void init() {
        ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
        threadFactoryBuilder.setDaemon(true);
        this.asyncHashedWheelTimer = new AsyncHashedWheelTimer(threadFactoryBuilder.setNameFormat("Registry-SessionServerNotifier-WheelTimer").build(), 500L, TimeUnit.MILLISECONDS, 1024, this.dataServerConfig.getSessionServerNotifierRetryExecutorThreadSize(), this.dataServerConfig.getSessionServerNotifierRetryExecutorQueueSize(), threadFactoryBuilder.setNameFormat("Registry-SessionServerNotifier-WheelExecutor-%d").build(), new AsyncHashedWheelTimer.TaskFailedCallback(){

            public void executionRejected(Throwable e) {
                LOGGER.error("executionRejected: " + e.getMessage(), e);
            }

            public void executionFailed(Throwable e) {
                LOGGER.error("executionFailed: " + e.getMessage(), e);
            }
        });
    }

    @Override
    public Set<DataSourceTypeEnum> getSuitableSource() {
        HashSet<DataSourceTypeEnum> set = new HashSet<DataSourceTypeEnum>();
        set.add(DataSourceTypeEnum.PUB);
        set.add(DataSourceTypeEnum.SYNC);
        set.add(DataSourceTypeEnum.SNAPSHOT);
        return set;
    }

    @Override
    public void notify(Datum datum, Long lastVersion) {
        DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(), datum.getDataCenter(), datum.getVersion());
        List<Connection> connections = this.sessionServerConnectionFactory.getSessionConnections();
        for (Connection connection : connections) {
            this.doNotify(new NotifyCallback(connection, request));
        }
    }

    private void doNotify(NotifyCallback notifyCallback) {
        Connection connection = notifyCallback.connection;
        DataChangeRequest request = notifyCallback.request;
        try {
            if (!connection.isFine()) {
                LOGGER.info(String.format("connection from sessionServer(%s) is not fine, so ignore notify, retryTimes=%s,request=%s", connection.getRemoteAddress(), notifyCallback.retryTimes, request));
                return;
            }
            Server sessionServer = this.boltExchange.getServer(Integer.valueOf(this.dataServerConfig.getPort()));
            sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()), (Object)request, (CallbackHandler)notifyCallback, this.dataServerConfig.getRpcTimeout());
        }
        catch (Exception e) {
            LOGGER.error(String.format("invokeWithCallback failed: sessionServer(%s),retryTimes=%s, request=%s", connection.getRemoteAddress(), notifyCallback.retryTimes, request), (Throwable)e);
            this.onFailed(notifyCallback);
        }
    }

    private void onFailed(NotifyCallback notifyCallback) {
        DataChangeRequest request = notifyCallback.request;
        Connection connection = notifyCallback.connection;
        notifyCallback.retryTimes++;
        long _currentVersion = this.datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
        if (request.getVersion() != _currentVersion) {
            LOGGER.info(String.format("current version change %s, retry version is %s, stop before retry! retryTimes=%s, request=%s", _currentVersion, request.getVersion(), notifyCallback.retryTimes, request));
            return;
        }
        if (notifyCallback.retryTimes <= this.dataServerConfig.getNotifySessionRetryTimes()) {
            this.asyncHashedWheelTimer.newTimeout(timeout -> {
                LOGGER.info(String.format("retrying notify sessionServer(%s), retryTimes=%s, request=%s", connection.getRemoteAddress(), notifyCallback.retryTimes, request));
                long currentVersion = this.datumCache.get(request.getDataCenter(), request.getDataInfoId()).getVersion();
                if (request.getVersion() == currentVersion) {
                    this.doNotify(notifyCallback);
                } else {
                    LOGGER.info(String.format("current version change %s, retry version is %s, stop retry! retryTimes=%s, request=%s", currentVersion, request.getVersion(), notifyCallback.retryTimes, request));
                }
            }, this.getDelayTimeForRetry(notifyCallback.retryTimes), TimeUnit.MILLISECONDS);
        } else {
            LOGGER.error(String.format("retryTimes have exceeded! stop retry! retryTimes=%s, sessionServer(%s), request=%s", notifyCallback.retryTimes, connection.getRemoteAddress(), request));
        }
    }

    private long getDelayTimeForRetry(int retryTimes) {
        long increment;
        long initialSleepTime = TimeUnit.MILLISECONDS.toMillis(this.dataServerConfig.getNotifySessionRetryFirstDelay());
        long result = initialSleepTime + (increment = TimeUnit.MILLISECONDS.toMillis(this.dataServerConfig.getNotifySessionRetryIncrementDelay())) * (long)(retryTimes - 1);
        return result >= 0L ? result : 0L;
    }

    private class NotifyCallback
    implements CallbackHandler {
        private int retryTimes = 0;
        private Connection connection;
        private DataChangeRequest request;

        public NotifyCallback(Connection connection, DataChangeRequest request) {
            this.connection = connection;
            this.request = request;
        }

        public void onCallback(Channel channel, Object message) {
            CommonResponse result = (CommonResponse)message;
            if (result != null && !result.isSuccess()) {
                LOGGER.error(String.format("response not success when notify sessionServer(%s), retryTimes=%s, request=%s, response=%s", this.connection.getRemoteAddress(), this.retryTimes, this.request, result));
                SessionServerNotifier.this.onFailed(this);
            }
        }

        public void onException(Channel channel, Throwable e) {
            LOGGER.error(String.format("exception when notify sessionServer(%s), retryTimes=%s, request=%s", this.connection.getRemoteAddress(), this.retryTimes, this.request), e);
            SessionServerNotifier.this.onFailed(this);
        }

        public Executor getExecutor() {
            return ExecutorFactory.NOTIFY_SESSION_CALLBACK_EXECUTOR;
        }
    }
}

