/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.monitor.cluster;

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.turbine.data.AggDataFromCluster;
import com.netflix.turbine.data.DataFromSingleInstance;
import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.data.meta.AggDataMetaInfoAdaptor;
import com.netflix.turbine.data.meta.MetaInformation;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.handler.TurbineDataDispatcher;
import com.netflix.turbine.handler.TurbineDataHandler;
import com.netflix.turbine.monitor.MonitorConsole;
import com.netflix.turbine.monitor.TurbineDataMonitor;
import com.netflix.turbine.monitor.cluster.ClusterMonitor;
import com.netflix.turbine.monitor.cluster.ObservationCriteria;
import com.netflix.turbine.monitor.cluster.TimeBoundCache;
import com.netflix.turbine.monitor.instance.InstanceUrlClosure;
import com.netflix.turbine.monitor.instance.StaleConnectionMonitorReaper;
import com.netflix.turbine.utils.EventThrottle;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregateClusterMonitor
extends ClusterMonitor<AggDataFromCluster> {
    private static final Logger logger = LoggerFactory.getLogger(AggregateClusterMonitor.class);
    private final ConcurrentHashMap<TurbineData.Key, AggDataFromCluster> aggregateData = new ConcurrentHashMap();
    private final TurbineDataHandler<DataFromSingleInstance> eventHandler;
    private final ObservationCriteria observationCriteria;
    private final PerformanceCriteria performanceCriteria;
    private final TimeBoundCache<String> timedCache;
    public static MonitorConsole<AggDataFromCluster> AggregatorClusterMonitorConsole = new MonitorConsole();
    public static TurbineDataDispatcher<DataFromSingleInstance> InstanceMonitorDispatcher = new TurbineDataDispatcher("ALL_INSTANCE_MONITOR_DISPATCHER");
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final MetaInformation<AggDataFromCluster> metaInfo;

    public static TurbineDataMonitor<AggDataFromCluster> findOrRegisterAggregateMonitor(String clusterName) {
        TurbineDataMonitor clusterMonitor = AggregatorClusterMonitorConsole.findMonitor(clusterName + "_agg");
        if (clusterMonitor == null) {
            logger.info("Could not find monitors: " + AggregatorClusterMonitorConsole.toString());
            clusterMonitor = new AggregateClusterMonitor(clusterName + "_agg", clusterName);
            clusterMonitor = AggregatorClusterMonitorConsole.findOrRegisterMonitor(clusterMonitor);
        }
        return clusterMonitor;
    }

    public AggregateClusterMonitor(String name, String clusterName) {
        this(name, new ObservationCriteria.ClusterBasedObservationCriteria(clusterName), new PerformanceCriteria.AggClusterPerformanceCriteria(clusterName), new MonitorConsole<DataFromSingleInstance>(), InstanceMonitorDispatcher, InstanceUrlClosure.ClusterConfigBasedUrlClosure);
    }

    public AggregateClusterMonitor(String name, ObservationCriteria observeCriteria, PerformanceCriteria perfCriteria, MonitorConsole<DataFromSingleInstance> instanceMonitorConsole, TurbineDataDispatcher<DataFromSingleInstance> instanceMonitorDispatcher, InstanceUrlClosure urlClosure) {
        super(name, new TurbineDataDispatcher("AGG_CLUSTER_MONITOR_" + name), AggregatorClusterMonitorConsole, instanceMonitorDispatcher, instanceMonitorConsole, urlClosure);
        this.eventHandler = new AggStatsEventHandler(this);
        this.observationCriteria = observeCriteria;
        this.performanceCriteria = perfCriteria;
        this.timedCache = new TimeBoundCache(name);
        this.metaInfo = new MetaInformation<AggDataFromCluster>(this, new AggDataMetaInfoAdaptor(this));
    }

    @Override
    public void startMonitor() throws Exception {
        super.startMonitor();
        this.timedCache.startCache();
        boolean success = this.started.compareAndSet(false, true);
        if (!success) {
            return;
        }
        StaleConnectionMonitorReaper.Instance.addMonitorConsole(this.getInstanceMonitors());
        StaleConnectionMonitorReaper.Instance.start();
    }

    @Override
    public void stopMonitor() {
        super.stopMonitor();
        this.timedCache.stopCache();
        StaleConnectionMonitorReaper.Instance.removeMonitorConsole(this.getInstanceMonitors());
    }

    @Override
    public TurbineDataHandler<DataFromSingleInstance> getEventHandler() {
        return this.eventHandler;
    }

    @Override
    public ObservationCriteria getObservationCriteria() {
        return this.observationCriteria;
    }

    @Override
    protected MetaInformation<AggDataFromCluster> getMetaInformation() {
        return this.metaInfo;
    }

    private boolean stopped() {
        return this.stopped;
    }

    public String getReportingDataDebug(String typeString, String nameString) {
        StringBuilder sb = new StringBuilder();
        for (TurbineData.Key key : this.aggregateData.keySet()) {
            if (!key.getType().equals(typeString) || nameString != null && !nameString.equals(key.getName())) continue;
            sb.append(key.getName());
            AggDataFromCluster data = this.aggregateData.get(key);
            sb.append(" -> " + data.getReportingDataDebug());
            sb.append("\n");
        }
        return sb.toString();
    }

    public void removeKey(String type, String name) {
        TurbineData.Key key = new TurbineData.Key(type, name);
        this.aggregateData.remove(key);
    }

    public void removeAllKeys() {
        this.aggregateData.clear();
    }

    public static class AggStatsEventHandler
    implements TurbineDataHandler<DataFromSingleInstance> {
        private AggregateClusterMonitor monitor;
        private final AtomicLong lastFlushTime = new AtomicLong(0L);
        final DynamicIntProperty eventFlushThreshold = DynamicPropertyFactory.getInstance().getIntProperty("turbine.aggregator.throttle.eventFlushThreshold", 500);
        final DynamicIntProperty eventFlushDelayMillis = DynamicPropertyFactory.getInstance().getIntProperty("turbine.aggregator.throttle.eventFlushDelay", 3000);
        final EventThrottle<DataFromSingleInstance> throttleCheck = new EventThrottle(this.eventFlushThreshold, this.eventFlushDelayMillis);

        public AggStatsEventHandler(AggregateClusterMonitor monitor) {
            this.monitor = monitor;
        }

        @Override
        public String getName() {
            return this.monitor.getName() + "_aggClusterEventHandler";
        }

        @Override
        public void handleData(Collection<DataFromSingleInstance> statsData) {
            if (this.monitor.stopped()) {
                return;
            }
            for (DataFromSingleInstance data : statsData) {
                AggDataFromCluster clusterData;
                if (this.monitor.timedCache.lookup(data.getHost().getHostname())) continue;
                TurbineData.Key dataKey = data.getKey();
                if (logger.isDebugEnabled() && data.getNumericAttributes() != null && data.getNumericAttributes().containsKey("currentTime")) {
                    long timeFromHost = data.getNumericAttributes().get("currentTime");
                    logger.debug("ClusterMonitor data from SingleInstance => Latency: " + (System.currentTimeMillis() - timeFromHost) + "  for [" + data.getName() + "] from " + data.getHost().getHostname());
                }
                if ((clusterData = (AggDataFromCluster)this.monitor.aggregateData.get(dataKey)) == null) {
                    this.monitor.aggregateData.putIfAbsent(dataKey, new AggDataFromCluster(this.monitor, data.getType(), data.getName()));
                }
                clusterData = (AggDataFromCluster)this.monitor.aggregateData.get(dataKey);
                clusterData.addStatsDataFromSingleServer(data);
                AggDataFromCluster dataToSend = (AggDataFromCluster)this.monitor.aggregateData.get(dataKey);
                if (dataToSend == null || this.throttleCheck.throttle(data)) continue;
                dataToSend.performPostProcessing();
                this.monitor.clusterDispatcher.pushData(this.monitor.getStatsInstance(), dataToSend);
            }
            long now = System.currentTimeMillis();
            if (this.lastFlushTime.get() == 0L || now - this.lastFlushTime.get() > 2000L) {
                this.performPostProcessing();
                boolean continueRunning = this.monitor.clusterDispatcher.pushData(this.monitor.getStatsInstance(), this.monitor.aggregateData.values());
                this.lastFlushTime.set(now);
                if (!continueRunning) {
                    logger.info("No more listeners to the cluster monitor, stopping monitor");
                    this.monitor.stopMonitor();
                }
            }
        }

        private void performPostProcessing() {
            for (AggDataFromCluster data : this.monitor.aggregateData.values()) {
                data.performPostProcessing();
            }
        }

        @Override
        public void handleHostLost(Instance host) {
            logger.info("Host lost: " + host.getHostname() + ", adding to time based cache\n");
            this.monitor.timedCache.put(host.getHostname());
            for (TurbineData.Key key : this.monitor.aggregateData.keySet()) {
                AggDataFromCluster dataFromCluster = (AggDataFromCluster)this.monitor.aggregateData.get(key);
                dataFromCluster.removeDataForHost(host);
            }
        }

        @Override
        public PerformanceCriteria getCriteria() {
            return this.monitor.performanceCriteria;
        }

        public String toString() {
            return this.getName();
        }
    }
}

