/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.change.event;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.cache.UnPublisher;
import com.alipay.sofa.registry.server.data.change.ChangeData;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.SnapshotData;
import com.alipay.sofa.registry.server.data.change.event.ClientChangeEvent;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEvent;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.change.event.DataChangeScopeEnum;
import com.alipay.sofa.registry.server.data.change.event.DatumSnapshotEvent;
import com.alipay.sofa.registry.server.data.change.event.IDataChangeEvent;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.node.DataServerNode;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerNodeFactory;
import com.google.common.collect.Interners;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.ReentrantLock;

public class DataChangeEventQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeEventQueue.class);
    private static final Logger LOGGER_START = LoggerFactory.getLogger((String)"DATA-START-LOGS");
    private static final Logger RENEW_LOGGER = LoggerFactory.getLogger((String)"RENEW-LOGGER", (String)"[DataChangeEventQueue]");
    private final String name;
    private final BlockingQueue<IDataChangeEvent> eventQueue;
    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<String, Map<String, ChangeData>>();
    private final DelayQueue<ChangeData> CHANGE_QUEUE = new DelayQueue();
    private final int notifyIntervalMs;
    private final int notifyTempDataIntervalMs;
    private final ReentrantLock lock = new ReentrantLock();
    private final int queueIdx;
    private DataServerConfig dataServerConfig;
    private DataChangeEventCenter dataChangeEventCenter;
    private DatumCache datumCache;

    public DataChangeEventQueue(int queueIdx, DataServerConfig dataServerConfig, DataChangeEventCenter dataChangeEventCenter, DatumCache datumCache) {
        this.queueIdx = queueIdx;
        this.name = String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), queueIdx);
        this.dataServerConfig = dataServerConfig;
        int queueSize = dataServerConfig.getQueueSize();
        this.eventQueue = queueSize <= 0 ? new LinkedBlockingDeque<IDataChangeEvent>() : new LinkedBlockingDeque<IDataChangeEvent>(queueSize);
        this.notifyIntervalMs = dataServerConfig.getNotifyIntervalMs();
        this.notifyTempDataIntervalMs = dataServerConfig.getNotifyTempDataIntervalMs();
        this.dataChangeEventCenter = dataChangeEventCenter;
        this.datumCache = datumCache;
    }

    public void onChange(IDataChangeEvent event) {
        try {
            this.eventQueue.add(event);
        }
        catch (Throwable e) {
            LOGGER.error("Error onChange: " + e.getMessage(), e);
            throw e;
        }
    }

    public String getName() {
        return this.name;
    }

    public ChangeData take() throws InterruptedException {
        ChangeData changeData = (ChangeData)this.CHANGE_QUEUE.take();
        this.lock.lock();
        try {
            this.removeMapForMerge(changeData);
            ChangeData changeData2 = changeData;
            return changeData2;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void removeMapForMerge(ChangeData changeData) {
        Datum datum = changeData.getDatum();
        if (changeData.getSourceType() != DataSourceTypeEnum.PUB_TEMP && datum != null) {
            this.CHANGE_DATA_MAP_FOR_MERGE.get(datum.getDataCenter()).remove(datum.getDataInfoId());
        }
    }

    private ChangeData getChangeData(String dataCenter, String dataInfoId, DataSourceTypeEnum sourceType, DataChangeTypeEnum changeType) {
        ChangeData changeData;
        ConcurrentHashMap<String, ChangeData> newMap;
        ConcurrentHashMap<String, ChangeData> map = this.CHANGE_DATA_MAP_FOR_MERGE.get(dataCenter);
        if (map == null && (map = (ConcurrentHashMap<String, ChangeData>)this.CHANGE_DATA_MAP_FOR_MERGE.putIfAbsent(dataCenter, newMap = new ConcurrentHashMap<String, ChangeData>())) == null) {
            map = newMap;
        }
        if ((changeData = map.get(dataInfoId)) == null) {
            ChangeData newChangeData = new ChangeData(null, this.notifyIntervalMs, sourceType, changeType);
            changeData = map.putIfAbsent(dataInfoId, newChangeData);
            if (changeData == null) {
                changeData = newChangeData;
            }
            this.CHANGE_QUEUE.put(changeData);
        }
        return changeData;
    }

    public void start() {
        Executor executor = ExecutorFactory.newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), this.getName()));
        executor.execute(() -> {
            while (true) {
                try {
                    while (true) {
                        IDataChangeEvent event;
                        DataChangeScopeEnum scope;
                        if ((scope = (event = this.eventQueue.take()).getScope()) == DataChangeScopeEnum.DATUM) {
                            DataChangeEvent dataChangeEvent = (DataChangeEvent)event;
                            if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                                this.addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType());
                                continue;
                            }
                            this.handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(), dataChangeEvent.getDatum());
                            continue;
                        }
                        if (scope == DataChangeScopeEnum.CLIENT) {
                            this.handleClientOff((ClientChangeEvent)event);
                            continue;
                        }
                        if (scope != DataChangeScopeEnum.SNAPSHOT) continue;
                        this.handleSnapshot((DatumSnapshotEvent)event);
                    }
                }
                catch (Throwable e) {
                    LOGGER.error("[{}] handle change event failed", (Object)this.getName(), (Object)e);
                    continue;
                }
                break;
            }
        });
        LOGGER_START.info("[{}] start DataChangeEventQueue success", (Object)this.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleClientOff(ClientChangeEvent event) {
        String connectId = event.getHost();
        Object object = Interners.newWeakInterner().intern((Object)connectId);
        synchronized (object) {
            Map<String, Publisher> pubMap = this.datumCache.getByConnectId(connectId);
            if (pubMap != null && !pubMap.isEmpty()) {
                LOGGER.info("[{}] client off begin, connectId={}, occurTimestamp={}, all pubSize={}", new Object[]{this.getName(), connectId, event.getOccurredTimestamp(), pubMap.size()});
                int count = 0;
                for (Publisher publisher : pubMap.values()) {
                    DataServerNode dataServerNode;
                    if (!this.belongTo(publisher.getDataInfoId()) || !DataServerConfig.IP.equals((dataServerNode = DataServerNodeFactory.computeDataServerNode(this.dataServerConfig.getLocalDataCenter(), publisher.getDataInfoId())).getIp())) continue;
                    Datum datum = new Datum((Publisher)new UnPublisher(publisher.getDataInfoId(), publisher.getRegisterId(), event.getOccurredTimestamp()), event.getDataCenter(), event.getVersion());
                    datum.setContainsUnPub(true);
                    this.handleDatum(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum);
                    ++count;
                }
                LOGGER.info("[{}] client off handle, connectId={}, occurTimestamp={}, version={}, handle pubSize={}", new Object[]{this.getName(), connectId, event.getOccurredTimestamp(), event.getVersion(), count});
            } else {
                LOGGER.info("[{}] no datum to handle, connectId={}", (Object)this.getName(), (Object)connectId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum targetDatum) {
        this.lock.lock();
        try {
            ChangeData changeData = this.getChangeData(targetDatum.getDataCenter(), targetDatum.getDataInfoId(), sourceType, changeType);
            Datum cacheDatum = changeData.getDatum();
            if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
                changeData.setDatum(targetDatum);
            } else {
                Map targetPubMap = targetDatum.getPubMap();
                Map cachePubMap = cacheDatum.getPubMap();
                for (Publisher pub : targetPubMap.values()) {
                    String registerId = pub.getRegisterId();
                    Publisher cachePub = (Publisher)cachePubMap.get(registerId);
                    if (cachePub != null && (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp() || !(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher) && pub.getSourceAddress().equals(cachePub.getSourceAddress()) && cachePub.getVersion() > pub.getVersion())) continue;
                    cachePubMap.put(registerId, pub);
                    cacheDatum.setVersion(targetDatum.getVersion());
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleSnapshot(DatumSnapshotEvent event) {
        String connectId = event.getConnectId();
        Map<String, Publisher> cachePubMap = event.getCachePubMap();
        Map<String, Publisher> snapshotPubMap = event.getPubMap();
        HashMap<String, SnapshotData> dataInfoId2SnapshotData = new HashMap<String, SnapshotData>();
        Iterator iterator = Interners.newWeakInterner().intern((Object)connectId);
        synchronized (iterator) {
            SnapshotData snapshotData;
            String dataInfoId;
            Publisher publisher;
            String registerId;
            for (Map.Entry<String, Publisher> entry : snapshotPubMap.entrySet()) {
                registerId = entry.getKey();
                publisher = entry.getValue();
                dataInfoId = publisher.getDataInfoId();
                if (!this.belongTo(dataInfoId)) continue;
                snapshotData = this.getOrCreateSnapshotData(dataInfoId2SnapshotData, dataInfoId);
                snapshotData.getSnapshotPubMap().put(registerId, publisher);
            }
            for (Map.Entry<String, Publisher> entry : cachePubMap.entrySet()) {
                registerId = entry.getKey();
                publisher = entry.getValue();
                dataInfoId = publisher.getDataInfoId();
                if (!this.belongTo(dataInfoId)) continue;
                snapshotData = this.getOrCreateSnapshotData(dataInfoId2SnapshotData, dataInfoId);
                snapshotData.getToBeDeletedPubMap().put(registerId, publisher);
            }
        }
        for (SnapshotData snapshotData : dataInfoId2SnapshotData.values()) {
            RENEW_LOGGER.info("SnapshotData: connectId={}, dataInfoId={}, cachePubSize={}, snapshotPubSize={}", new Object[]{connectId, snapshotData.getDataInfoId(), snapshotData.getToBeDeletedPubMap().size(), snapshotData.getSnapshotPubMap().size()});
            this.CHANGE_QUEUE.put(snapshotData);
        }
    }

    private SnapshotData getOrCreateSnapshotData(Map<String, SnapshotData> dataInfoId2SnapshotData, String dataInfoId) {
        SnapshotData snapshotData = dataInfoId2SnapshotData.get(dataInfoId);
        if (snapshotData == null) {
            snapshotData = new SnapshotData(dataInfoId, new HashMap<String, Publisher>(), new HashMap<String, Publisher>());
            dataInfoId2SnapshotData.put(dataInfoId, snapshotData);
        }
        return snapshotData;
    }

    private void addTempChangeData(Datum targetDatum, DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType) {
        ChangeData tempChangeData = new ChangeData(targetDatum, this.notifyTempDataIntervalMs, sourceType, changeType);
        this.CHANGE_QUEUE.put(tempChangeData);
    }

    private boolean belongTo(String dataInfoId) {
        return this.queueIdx == this.dataChangeEventCenter.hash(dataInfoId);
    }
}

