/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.handler;

import com.netflix.turbine.data.AggDataFromCluster;
import com.netflix.turbine.data.ConcurrentEventQueue;
import com.netflix.turbine.data.DataFromSingleInstance;
import com.netflix.turbine.data.EventQueue;
import com.netflix.turbine.data.StatsRollingNumber;
import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.handler.TurbineDataHandler;
import com.netflix.turbine.utils.WorkerThread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HandlerQueueTuple<K extends TurbineData> {
    private static final Logger logger = LoggerFactory.getLogger(AggDataFromCluster.class);
    private final TurbineDataHandler<K> eventHandler;
    private final EventQueue<K> queue;
    private final List<WorkerThread> workerThreads;
    private final int numThreads;
    private volatile boolean stopped = false;
    private StatsRollingNumber counter = new StatsRollingNumber(10000, 10);

    public HandlerQueueTuple(TurbineDataHandler<K> eventHandler) {
        this.eventHandler = eventHandler;
        this.queue = new ConcurrentEventQueue<K>(eventHandler.getCriteria().getMaxQueueSize());
        this.numThreads = eventHandler.getCriteria().numThreads();
        this.workerThreads = new ArrayList<WorkerThread>(this.numThreads);
        this.stopped = false;
    }

    public void start() throws Exception {
        if (this.stopped) {
            logger.info("\n\nTuple already stopped, will not start again, need to create new tuple");
            return;
        }
        for (int i = 0; i < this.numThreads; ++i) {
            PerHandlerDispatcher dispatcherForHandler = new PerHandlerDispatcher(this.queue, this.eventHandler, this.numThreads);
            WorkerThread thread = new WorkerThread(dispatcherForHandler, -1, false);
            this.workerThreads.add(thread);
            thread.start();
        }
    }

    public void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        for (WorkerThread thread : this.workerThreads) {
            thread.stopAndBlock();
        }
        this.workerThreads.clear();
        logger.info("\n\nRemoving tuple for : " + this.eventHandler.getName() + " tuple running: " + this.running());
    }

    public boolean previouslyStopped() {
        return this.stopped;
    }

    public boolean running() {
        for (WorkerThread thread : this.workerThreads) {
            if (!thread.isRunning()) continue;
            return true;
        }
        return false;
    }

    private boolean isCritical() {
        return this.eventHandler.getCriteria().isCritical();
    }

    public TurbineDataHandler<K> getHandler() {
        return this.eventHandler;
    }

    public EventQueue<K> getQueue() {
        return this.queue;
    }

    public void pushData(Collection<K> statsData) {
        if (this.stopped) {
            return;
        }
        for (TurbineData data : statsData) {
            this.pushData(data);
        }
    }

    public void pushData(K data) {
        if (this.stopped) {
            return;
        }
        boolean success = this.queue.writeEvent(data);
        if (this.isCritical()) {
            if (success) {
                this.counter.increment(StatsRollingNumber.Type.EVENT_PROCESSED);
            } else {
                this.counter.increment(StatsRollingNumber.Type.EVENT_DISCARDED);
            }
        }
    }

    public String toString() {
        return "HandlerQueueTuple [eventHandler=" + this.eventHandler + "]";
    }

    public static class UnitTest {
        @Test
        public void testProcessWithMultipleThreads() throws Exception {
            final AtomicInteger handlerCount = new AtomicInteger(0);
            TurbineDataHandler<TurbineData> testHandler = new TurbineDataHandler<TurbineData>(){

                @Override
                public String getName() {
                    return "testHandler";
                }

                @Override
                public void handleData(Collection<TurbineData> stats) {
                    handlerCount.addAndGet(stats.size());
                }

                @Override
                public void handleHostLost(Instance host) {
                }

                @Override
                public PerformanceCriteria getCriteria() {
                    return new PerformanceCriteria(){

                        @Override
                        public boolean isCritical() {
                            return false;
                        }

                        @Override
                        public int getMaxQueueSize() {
                            return 10000;
                        }

                        @Override
                        public int numThreads() {
                            return 1;
                        }
                    };
                }
            };
            final HashMap testAttrs = new HashMap();
            final Instance host = new Instance("host", "cluster", true);
            final HandlerQueueTuple<TurbineData> tuple = new HandlerQueueTuple<TurbineData>(testHandler);
            tuple.start();
            final AtomicBoolean stop = new AtomicBoolean(false);
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            ArrayList<Future<Integer>> futures = new ArrayList<Future<Integer>>();
            for (int i = 0; i < 10; ++i) {
                futures.add(threadPool.submit(new Callable<Integer>(){
                    final AtomicInteger count = new AtomicInteger(0);
                    final Random random = new Random();

                    @Override
                    public Integer call() throws Exception {
                        while (!stop.get()) {
                            Collection<TurbineData> data = this.getRandomData();
                            tuple.pushData(data);
                            this.count.addAndGet(data.size());
                            Thread.sleep(50L);
                        }
                        return this.count.get();
                    }

                    private Collection<TurbineData> getRandomData() {
                        int size = this.random.nextInt(10);
                        ArrayList<TurbineData> list = new ArrayList<TurbineData>();
                        for (int i = 0; i < size; ++i) {
                            list.add(new DataFromSingleInstance(null, "type", "name", host, testAttrs, 0L));
                        }
                        return list;
                    }
                }));
            }
            Thread.sleep(3000L);
            stop.set(true);
            int sum = 0;
            for (Future future : futures) {
                sum += ((Integer)future.get()).intValue();
            }
            threadPool.shutdownNow();
            Thread.sleep(2000L);
            tuple.stop();
            Thread.sleep(1000L);
            Assert.assertTrue((sum == handlerCount.get() ? 1 : 0) != 0);
        }
    }

    private class PerHandlerDispatcher
    implements WorkerThread.Worker {
        private final EventQueue<K> queue;
        private final TurbineDataHandler<K> eventHandler;

        private PerHandlerDispatcher(EventQueue<K> queue, TurbineDataHandler<K> handler) {
            this(queue, handler, 1);
        }

        private PerHandlerDispatcher(EventQueue<K> queue, TurbineDataHandler<K> handler, int divFactor) {
            this.queue = queue;
            this.eventHandler = handler;
        }

        @Override
        public void init() throws Exception {
            logger.info("Per handler dispacher started for: " + this.eventHandler.getName());
        }

        @Override
        public void doWork() throws Exception {
            block4: {
                ArrayList<TurbineData> statsData = new ArrayList<TurbineData>();
                int numMisses = 0;
                boolean stopPolling = false;
                do {
                    TurbineData data;
                    if ((data = (TurbineData)this.queue.readEvent()) == null) {
                        if (++numMisses <= 100) continue;
                        Thread.sleep(100L);
                        numMisses = 0;
                        continue;
                    }
                    statsData.add(data);
                    numMisses = 0;
                    stopPolling = true;
                } while (!stopPolling);
                try {
                    this.eventHandler.handleData(statsData);
                }
                catch (Exception e) {
                    if (!this.eventHandler.getCriteria().isCritical()) break block4;
                    logger.warn("Could not publish event to event handler for " + this.eventHandler.getName(), (Throwable)e);
                }
            }
        }

        @Override
        public void cleanup() throws Exception {
        }
    }
}

