/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.datasync.sync;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.NotifyDataSyncRequest;
import com.alipay.sofa.registry.common.model.dataserver.SyncData;
import com.alipay.sofa.registry.common.model.dataserver.SyncDataRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.datasync.AcceptorStore;
import com.alipay.sofa.registry.server.data.datasync.Operator;
import com.alipay.sofa.registry.server.data.datasync.sync.Acceptor;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerConnectionFactory;
import com.alipay.sofa.registry.server.data.remoting.metaserver.IMetaServerService;
import com.alipay.sofa.registry.server.data.util.DelayItem;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class AbstractAcceptorStore
implements AcceptorStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAcceptorStore.class, (String)"[SyncDataService]");
    private static final int DEFAULT_MAX_BUFFER_SIZE = 30;
    private static final int DEFAULT_DELAY_TIMEOUT = 3000;
    private static final int NOTIFY_RETRY = 3;
    @Autowired
    protected IMetaServerService metaServerService;
    @Autowired
    private Exchange boltExchange;
    @Autowired
    private DataServerConfig dataServerBootstrapConfig;
    @Autowired
    private DataServerConnectionFactory dataServerConnectionFactory;
    private Map<String, Map<String, Acceptor>> acceptors = new ConcurrentHashMap<String, Map<String, Acceptor>>();
    private Map<String, Map<String, Acceptor>> notifyAcceptorsCache = new ConcurrentHashMap<String, Map<String, Acceptor>>();
    private DelayQueue<DelayItem<Acceptor>> delayQueue = new DelayQueue();

    @Override
    public void checkAcceptorsChangAndExpired() {
        this.acceptors.forEach((dataCenter, acceptorMap) -> {
            if (acceptorMap != null && !acceptorMap.isEmpty()) {
                acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0));
            }
        });
    }

    private String getLogByClass(String msg) {
        StringBuilder sb = new StringBuilder();
        sb.append(" [").append(this.getClass().getSimpleName()).append("] ").append(msg);
        return sb.toString();
    }

    @Override
    public void addOperator(Operator operator) {
        Datum datum = operator.getDatum();
        String dataCenter = datum.getDataCenter();
        String dataInfoId = datum.getDataInfoId();
        try {
            Acceptor newAcceptor;
            Acceptor existAcceptor;
            ConcurrentHashMap<String, Acceptor> newMap;
            ConcurrentHashMap<String, Acceptor> acceptorMap = this.acceptors.get(dataCenter);
            if (acceptorMap == null && (acceptorMap = (ConcurrentHashMap<String, Acceptor>)this.acceptors.putIfAbsent(dataCenter, newMap = new ConcurrentHashMap<String, Acceptor>())) == null) {
                acceptorMap = newMap;
            }
            if ((existAcceptor = acceptorMap.get(dataInfoId)) == null && (existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor = new Acceptor(30, dataInfoId, dataCenter))) == null) {
                existAcceptor = newAcceptor;
            }
            existAcceptor.appendOperator(operator);
            this.putCache(existAcceptor);
        }
        catch (Exception e) {
            LOGGER.error(this.getLogByClass("Append Operator error!"), (Throwable)e);
            throw new RuntimeException("Append Operator error!", e);
        }
    }

    private void putCache(Acceptor acceptor) {
        String dataCenter = acceptor.getDataCenter();
        String dataInfoId = acceptor.getDataInfoId();
        try {
            Acceptor existAcceptor;
            ConcurrentHashMap<String, Acceptor> newMap;
            ConcurrentHashMap<String, Acceptor> acceptorMap = this.notifyAcceptorsCache.get(dataCenter);
            if (acceptorMap == null && (acceptorMap = (ConcurrentHashMap<String, Acceptor>)this.notifyAcceptorsCache.putIfAbsent(dataCenter, newMap = new ConcurrentHashMap<String, Acceptor>())) == null) {
                acceptorMap = newMap;
            }
            if ((existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor)) == null) {
                this.addQueue(acceptor);
            }
        }
        catch (Exception e) {
            LOGGER.error(this.getLogByClass("Operator push to delay cache error!"), (Throwable)e);
            throw new RuntimeException("Operator push to delay cache error!", e);
        }
    }

    private void removeCache(Acceptor acceptor) {
        String dataCenter = acceptor.getDataCenter();
        String dataInfoId = acceptor.getDataInfoId();
        try {
            boolean result;
            Map<String, Acceptor> acceptorMap = this.notifyAcceptorsCache.get(dataCenter);
            if (acceptorMap != null && (result = acceptorMap.remove(dataInfoId, acceptor))) {
                this.notifyChange(acceptor);
            }
        }
        catch (Exception e) {
            LOGGER.error(this.getLogByClass("Operator remove from delay cache error!"), (Throwable)e);
            throw new RuntimeException("Operator remove from delay cache error!", e);
        }
    }

    private void addQueue(Acceptor acceptor) {
        this.delayQueue.put(new DelayItem<Acceptor>(acceptor, 3000L));
    }

    private void notifyChange(Acceptor acceptor) {
        Long lastVersion = acceptor.getLastVersion();
        if (lastVersion == null) {
            LOGGER.warn(this.getLogByClass("There is not data in acceptor queue!maybe has been expired!"));
            lastVersion = 0L;
        }
        if (LOGGER.isDebugEnabled()) {
            acceptor.printInfo();
        }
        NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion.longValue(), this.getType());
        List<String> targetDataIps = this.getTargetDataIp(acceptor.getDataInfoId());
        block2: for (String targetDataIp : targetDataIps) {
            if (DataServerConfig.IP.equals(targetDataIp)) continue;
            Connection connection = this.dataServerConnectionFactory.getConnection(targetDataIp);
            if (connection == null) {
                LOGGER.error(this.getLogByClass(String.format("Can not get notify data server connection!ip: %s", targetDataIp)));
                continue;
            }
            LOGGER.info(this.getLogByClass("Notify data server {} change data {} to sync"), (Object)connection.getRemoteIP(), (Object)request);
            for (int tryCount = 0; tryCount < 3; ++tryCount) {
                try {
                    Server syncServer = this.boltExchange.getServer(Integer.valueOf(this.dataServerBootstrapConfig.getSyncDataPort()));
                    syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), (Object)request, 1000);
                    continue block2;
                }
                catch (Exception e) {
                    LOGGER.error(this.getLogByClass(String.format("Notify data server %s failed, NotifyDataSyncRequest:%s", targetDataIp, request)), (Throwable)e);
                    continue;
                }
            }
        }
    }

    public abstract List<String> getTargetDataIp(String var1);

    @Override
    public void changeDataCheck() {
        try {
            while (true) {
                DelayItem delayItem = (DelayItem)this.delayQueue.take();
                Acceptor acceptor = (Acceptor)delayItem.getItem();
                this.removeCache(acceptor);
            }
        }
        catch (InterruptedException e) {
            return;
        }
    }

    @Override
    public SyncData getSyncData(SyncDataRequest syncDataRequest) {
        String dataCenter = syncDataRequest.getDataCenter();
        String dataInfoId = syncDataRequest.getDataInfoId();
        Long currentVersion = syncDataRequest.getVersion();
        try {
            Map<String, Acceptor> acceptorMap = this.acceptors.get(dataCenter);
            if (acceptorMap == null) {
                LOGGER.error(this.getLogByClass("Can not find Sync Data acceptor instance,dataCenter:{}"), (Object)dataCenter);
                throw new RuntimeException("Can not find Sync Data acceptor instance!");
            }
            Acceptor existAcceptor = acceptorMap.get(dataInfoId);
            if (existAcceptor == null) {
                LOGGER.error(this.getLogByClass("Can not find Sync Data acceptor instance,dataInfoId:{}"), (Object)dataInfoId);
                throw new RuntimeException("Can not find Sync Data acceptor instance!");
            }
            return existAcceptor.process(currentVersion);
        }
        catch (Exception e) {
            LOGGER.error(this.getLogByClass("Get change SyncData error!"), (Throwable)e);
            throw new RuntimeException("Get change SyncData error!", e);
        }
    }

    public DataServerConfig getDataServerConfig() {
        return this.dataServerBootstrapConfig;
    }
}

