/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.change;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.cache.MergeResult;
import com.alipay.sofa.registry.server.data.change.ChangeData;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.SnapshotData;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventQueue;
import com.alipay.sofa.registry.server.data.change.notify.IDataChangeNotifier;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;

public class DataChangeHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataChangeHandler.class);
    private static final Logger LOGGER_START = LoggerFactory.getLogger((String)"DATA-START-LOGS");
    @Autowired
    private DataServerConfig dataServerConfig;
    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;
    @Autowired
    private DatumCache datumCache;
    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;

    @PostConstruct
    public void start() {
        DataChangeEventQueue[] queues = this.dataChangeEventCenter.getQueues();
        int queueCount = queues.length;
        Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
        Executor notifyExecutor = ExecutorFactory.newFixedThreadPool(this.dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
        for (int idx = 0; idx < queueCount; ++idx) {
            DataChangeEventQueue dataChangeEventQueue = queues[idx];
            String name = dataChangeEventQueue.getName();
            executor.execute(() -> {
                while (true) {
                    try {
                        while (true) {
                            ChangeData changeData = dataChangeEventQueue.take();
                            notifyExecutor.execute(new ChangeNotifier(changeData, name));
                        }
                    }
                    catch (Throwable e) {
                        LOGGER.error("[DataChangeHandler][{}] notify scheduler error", (Object)name, (Object)e);
                        continue;
                    }
                    break;
                }
            });
            LOGGER_START.info("[DataChangeHandler] notify datum in queue:{} success", (Object)name);
        }
    }

    private class ChangeNotifier
    implements Runnable {
        private ChangeData changeData;
        private String name;

        public ChangeNotifier(ChangeData changeData, String name) {
            this.changeData = changeData;
            this.name = name;
        }

        @Override
        public void run() {
            if (this.changeData instanceof SnapshotData) {
                SnapshotData snapshotData = (SnapshotData)this.changeData;
                String dataInfoId = snapshotData.getDataInfoId();
                Map<String, Publisher> toBeDeletedPubMap = snapshotData.getToBeDeletedPubMap();
                Map<String, Publisher> snapshotPubMap = snapshotData.getSnapshotPubMap();
                Datum oldDatum = DataChangeHandler.this.datumCache.get(DataChangeHandler.this.dataServerConfig.getLocalDataCenter(), dataInfoId);
                long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0L;
                Datum datum = DataChangeHandler.this.datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap);
                long version = datum != null ? datum.getVersion() : 0L;
                LOGGER.info("[DataChangeHandler][{}] snapshot handle,dataInfoId={}, version={}, lastVersion={}", new Object[]{this.name, dataInfoId, version, lastVersion});
                this.notify(datum, this.changeData.getSourceType(), null);
            } else {
                Datum datum = this.changeData.getDatum();
                String dataCenter = datum.getDataCenter();
                String dataInfoId = datum.getDataInfoId();
                DataSourceTypeEnum sourceType = this.changeData.getSourceType();
                DataChangeTypeEnum changeType = this.changeData.getChangeType();
                if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) {
                    datum.updateVersion();
                }
                long version = datum.getVersion();
                try {
                    if (sourceType == DataSourceTypeEnum.CLEAN) {
                        if (DataChangeHandler.this.datumCache.cleanDatum(dataCenter, dataInfoId)) {
                            LOGGER.info("[DataChangeHandler][{}] clean datum, dataCenter={}, dataInfoId={}, version={},sourceType={}, changeType={}", new Object[]{this.name, dataCenter, dataInfoId, version, sourceType, changeType});
                        }
                    } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                        this.notifyTempPub(datum, sourceType, changeType);
                    } else {
                        MergeResult mergeResult = DataChangeHandler.this.datumCache.putDatum(changeType, datum);
                        Long lastVersion = mergeResult.getLastVersion();
                        if (lastVersion != null && lastVersion == -2L) {
                            LOGGER.error("[DataChangeHandler][{}] first put unPub datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}", new Object[]{this.name, dataCenter, dataInfoId, version, sourceType, datum.isContainsUnPub()});
                            return;
                        }
                        LOGGER.info("[DataChangeHandler][{}] datum handle,datum={},dataCenter={}, dataInfoId={}, version={}, lastVersion={}, sourceType={}, changeType={},changeFlag={},isContainsUnPub={}", new Object[]{this.name, datum.hashCode(), dataCenter, dataInfoId, version, lastVersion, sourceType, changeType, mergeResult.isChangeFlag(), datum.isContainsUnPub()});
                        if ((lastVersion == null || version != lastVersion) && mergeResult.isChangeFlag()) {
                            this.notify(datum, sourceType, lastVersion);
                        }
                    }
                }
                catch (Exception e) {
                    LOGGER.error("[DataChangeHandler][{}] put datum into cache error, dataCenter={}, dataInfoId={}, version={}, sourceType={},isContainsUnPub={}", new Object[]{this.name, dataCenter, dataInfoId, version, sourceType, datum.isContainsUnPub(), e});
                }
            }
        }

        private void notifyTempPub(Datum datum, DataSourceTypeEnum sourceType, DataChangeTypeEnum changeType) {
            Map cachePubMap;
            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            long version = datum.getVersion();
            Datum existDatum = DataChangeHandler.this.datumCache.get(dataCenter, dataInfoId);
            if (existDatum != null && (cachePubMap = existDatum.getPubMap()) != null && !cachePubMap.isEmpty()) {
                datum.getPubMap().putAll(cachePubMap);
            }
            LOGGER.info("[DataChangeHandler][{}] datum handle temp pub,datum={},dataCenter={}, dataInfoId={}, version={}, sourceType={}, changeType={}", new Object[]{this.name, datum.hashCode(), dataCenter, dataInfoId, version, sourceType, changeType});
            this.notify(datum, sourceType, null);
        }

        private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
            for (IDataChangeNotifier notifier : DataChangeHandler.this.dataChangeNotifiers) {
                if (!notifier.getSuitableSource().contains((Object)sourceType)) continue;
                notifier.notify(datum, lastVersion);
            }
        }
    }
}

