/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.change.notify;

import com.alipay.remoting.Connection;
import com.alipay.remoting.InvokeCallback;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.sessionserver.DataPushRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.CallbackHandler;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.notify.IDataChangeNotifier;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.sessionserver.SessionServerConnectionFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import org.springframework.beans.factory.annotation.Autowired;

public class TempPublisherNotifier
implements IDataChangeNotifier {
    private static final Logger LOGGER = LoggerFactory.getLogger(TempPublisherNotifier.class);
    private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(10, TempPublisherNotifier.class.getSimpleName());
    @Autowired
    private DataServerConfig dataServerConfig;
    @Autowired
    private Exchange boltExchange;
    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Override
    public Set<DataSourceTypeEnum> getSuitableSource() {
        HashSet<DataSourceTypeEnum> set = new HashSet<DataSourceTypeEnum>();
        set.add(DataSourceTypeEnum.PUB_TEMP);
        return set;
    }

    @Override
    public void notify(Datum datum, Long lastVersion) {
        DataPushRequest request = new DataPushRequest(datum);
        List<Connection> connections = this.sessionServerConnectionFactory.getSessionConnections();
        for (Connection connection : connections) {
            this.doNotify(new NotifyPushDataCallback(connection, request));
        }
    }

    private void doNotify(final NotifyPushDataCallback notifyPushdataCallback) {
        Connection connection = notifyPushdataCallback.getConnection();
        DataPushRequest request = notifyPushdataCallback.getRequest();
        try {
            Server sessionServer = this.boltExchange.getServer(Integer.valueOf(this.dataServerConfig.getPort()));
            sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()), (Object)request, new CallbackHandler(){

                public void onCallback(Channel channel, Object message) {
                    notifyPushdataCallback.onResponse(message);
                }

                public void onException(Channel channel, Throwable exception) {
                    notifyPushdataCallback.onException(exception);
                }

                public Executor getExecutor() {
                    return notifyPushdataCallback.getExecutor();
                }
            }, this.dataServerConfig.getRpcTimeout());
        }
        catch (Exception e) {
            LOGGER.error("[TempPublisherNotifier] notify sessionserver {} failed, {}", new Object[]{connection.getRemoteIP(), request, e});
        }
    }

    private static class NotifyPushDataCallback
    implements InvokeCallback {
        private Connection connection;
        private DataPushRequest request;

        public NotifyPushDataCallback(Connection connection, DataPushRequest request) {
            this.connection = connection;
            this.request = request;
        }

        public void onResponse(Object obj) {
            CommonResponse result = (CommonResponse)obj;
            if (result != null && !result.isSuccess()) {
                LOGGER.error("[TempPublisherNotifier] notify sessionserver {} not success, request={}, result={}", new Object[]{this.connection.getRemoteIP(), this.request, result.getMessage()});
            }
        }

        public void onException(Throwable e) {
            this.onResponse(CommonResponse.buildFailedResponse((String)e.getMessage()));
        }

        public Executor getExecutor() {
            return EXECUTOR;
        }

        public Connection getConnection() {
            return this.connection;
        }

        public DataPushRequest getRequest() {
            return this.request;
        }
    }
}

