/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.data;

import com.netflix.turbine.data.EventQueue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ConcurrentEventQueue<T>
implements EventQueue<T> {
    private ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue();
    private final long maxCapacity;
    private final AtomicLong count;

    public ConcurrentEventQueue(long capacity) {
        this.maxCapacity = capacity;
        this.count = new AtomicLong(0L);
    }

    @Override
    public T readEvent() {
        T event = this.queue.poll();
        if (event != null) {
            this.count.decrementAndGet();
        }
        return event;
    }

    @Override
    public boolean writeEvent(T event) {
        if (this.count.get() > this.maxCapacity) {
            return false;
        }
        this.count.incrementAndGet();
        this.queue.add(event);
        return true;
    }

    @Override
    public int getQueueSize() {
        return this.count.intValue();
    }

    public static class UnitTest {
        private volatile List<String> stringCache;
        private ConcurrentHashMap<String, AtomicLong> globalMap;
        private volatile boolean shutdown = false;
        private volatile boolean producerShutdown = false;
        private ConcurrentEventQueue<String> array;
        private AtomicLong producerCount = new AtomicLong(0L);
        private AtomicLong consumerCount = new AtomicLong(0L);

        @Before
        public void before() {
            int cacheSize = 10;
            this.stringCache = new ArrayList<String>(cacheSize);
            for (int i = 0; i < cacheSize; ++i) {
                this.stringCache.add(UUID.randomUUID().toString());
            }
            this.globalMap = new ConcurrentHashMap();
            this.array = new ConcurrentEventQueue(100L);
        }

        @Test(timeout=20000L)
        public void testSingleProducerSingleConsumer() throws Exception {
            this.testProcess(1, 1);
        }

        @Test(timeout=20000L)
        public void testSingleProducerMultiConsumer() throws Exception {
            this.testProcess(1, 10);
        }

        @Test(timeout=20000L)
        public void testMultiProducerSingleConsumer() throws Exception {
            this.testProcess(10, 1);
        }

        @Test(timeout=20000L)
        public void testMultiProducerMultiConsumer() throws Exception {
            this.testProcess(10, 10);
        }

        public void testProcess(int numProducers, int numConsumers) throws Exception {
            int i;
            this.shutdown = false;
            this.producerShutdown = false;
            ExecutorService producerPool = Executors.newFixedThreadPool(numProducers);
            ExecutorService consumerPool = Executors.newFixedThreadPool(numConsumers);
            ArrayList<Future<Map<String, Long>>> pFutures = new ArrayList<Future<Map<String, Long>>>(numProducers);
            for (i = 0; i < numProducers; ++i) {
                Future<Map<String, Long>> future = producerPool.submit(new Producer());
                pFutures.add(future);
            }
            for (i = 0; i < numConsumers; ++i) {
                consumerPool.submit(new Consumer());
            }
            Thread.sleep(4000L);
            this.shutdown = true;
            producerPool.shutdown();
            while (!producerPool.isTerminated()) {
                Thread.sleep(100L);
            }
            this.producerShutdown = true;
            consumerPool.shutdownNow();
            while (!consumerPool.isTerminated()) {
                Thread.sleep(100L);
            }
            HashMap<String, Long> expected = new HashMap<String, Long>();
            for (Future future : pFutures) {
                Map result = (Map)future.get();
                for (String key : result.keySet()) {
                    Long count = (Long)expected.get(key);
                    if (count == null) {
                        count = new Long(0L);
                    }
                    count = count + (Long)result.get(key);
                    expected.put(key, count);
                }
            }
            System.out.println("Producer count: " + this.producerCount.get());
            System.out.println("Consumer count: " + this.consumerCount.get());
            Assert.assertTrue((this.producerCount.get() == this.consumerCount.get() ? 1 : 0) != 0);
            for (String string : expected.keySet()) {
                Long expectedCount = (Long)expected.get(string);
                Long resultCount = this.globalMap.get(string).get();
                Assert.assertEquals((long)expectedCount, (long)resultCount);
            }
        }

        private class Consumer
        implements Callable<Void> {
            private Consumer() {
            }

            @Override
            public Void call() throws Exception {
                boolean stop = false;
                while (!stop) {
                    try {
                        String key = (String)UnitTest.this.array.readEvent();
                        if (key != null) {
                            UnitTest.this.consumerCount.incrementAndGet();
                            AtomicLong keyCount = (AtomicLong)UnitTest.this.globalMap.get(key);
                            if (keyCount == null) {
                                UnitTest.this.globalMap.putIfAbsent(key, new AtomicLong(0L));
                            }
                            keyCount = (AtomicLong)UnitTest.this.globalMap.get(key);
                            keyCount.incrementAndGet();
                            continue;
                        }
                        if (!UnitTest.this.producerShutdown) continue;
                        stop = true;
                    }
                    catch (Throwable t) {
                        System.out.println("Throwable caught: " + t.getMessage());
                        throw new RuntimeException(t);
                    }
                }
                return null;
            }
        }

        private class Producer
        implements Callable<Map<String, Long>> {
            private final Map<String, Long> result = new HashMap<String, Long>();
            private final Random random = new Random();
            private int failures = 0;

            private Producer() {
            }

            @Override
            public Map<String, Long> call() throws Exception {
                while (!UnitTest.this.shutdown) {
                    try {
                        int index = this.random.nextInt(UnitTest.this.stringCache.size());
                        String randomString = (String)UnitTest.this.stringCache.get(index);
                        boolean success = UnitTest.this.array.writeEvent(randomString);
                        if (success) {
                            UnitTest.this.producerCount.incrementAndGet();
                            Long count = this.result.get(randomString);
                            if (count == null) {
                                count = new Long(0L);
                            }
                            Long l = count;
                            Long l2 = count = Long.valueOf(count + 1L);
                            this.result.put(randomString, count);
                            continue;
                        }
                        ++this.failures;
                        if (this.failures <= 100) continue;
                        Thread.sleep(100L);
                    }
                    catch (Throwable t) {
                        System.out.println("ttt" + t.getMessage());
                        throw new RuntimeException(t);
                    }
                }
                return this.result;
            }
        }
    }
}

