/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.task.batcher;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.task.batcher.TaskHolder;
import com.alipay.sofa.registry.task.batcher.TaskProcessor;
import com.alipay.sofa.registry.task.batcher.TrafficShaper;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class AcceptorExecutor<ID, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AcceptorExecutor.class);
    private final int maxBufferSize;
    private final String name;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<TaskHolder<ID, T>>();
    private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<TaskHolder<ID, T>>();
    private final Thread acceptorThread;
    private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<ID, TaskHolder<ID, T>>();
    private final Deque<ID> processingOrder = new LinkedList<ID>();
    private final Semaphore workSemaphore = new Semaphore(0);
    private final BlockingQueue<TaskHolder<ID, T>> itemWorkQueue = new LinkedBlockingQueue<TaskHolder<ID, T>>();
    private final Semaphore batchWorkRequests = new Semaphore(0);
    private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<List<TaskHolder<ID, T>>>();
    private final TrafficShaper trafficShaper;
    private AtomicLong acceptedTasks = new AtomicLong();
    private AtomicLong replayedTasks = new AtomicLong();
    private AtomicLong expiredTasks = new AtomicLong();
    private AtomicLong overriddenTasks = new AtomicLong();
    private AtomicLong queueOverflows = new AtomicLong();

    AcceptorExecutor(String id, int maxBufferSize, long congestionRetryDelayMs, long networkFailureRetryMs) {
        this.name = "TaskAcceptor-" + id;
        this.maxBufferSize = maxBufferSize;
        this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
        ThreadGroup threadGroup = new ThreadGroup("serverTaskExecutors");
        this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
        this.acceptorThread.setDaemon(true);
        this.acceptorThread.start();
    }

    void process(ID id, T task, long expiryTime) {
        this.acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
        this.acceptedTasks.incrementAndGet();
    }

    void reprocess(TaskHolder<ID, T> taskHolder, TaskProcessor.ProcessingResult processingResult) {
        this.reprocessQueue.add(taskHolder);
        this.replayedTasks.incrementAndGet();
        this.trafficShaper.registerFailure(processingResult);
    }

    BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
        this.workSemaphore.release();
        return this.itemWorkQueue;
    }

    BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
        this.batchWorkRequests.release();
        return this.batchWorkQueue;
    }

    void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            this.acceptorThread.interrupt();
        }
    }

    public int getMaxBufferSize() {
        return this.maxBufferSize;
    }

    public AtomicLong getAcceptedTasks() {
        return this.acceptedTasks;
    }

    public AtomicLong getReplayedTasks() {
        return this.replayedTasks;
    }

    public AtomicLong getExpiredTasks() {
        return this.expiredTasks;
    }

    public AtomicLong getOverriddenTasks() {
        return this.overriddenTasks;
    }

    public AtomicLong getQueueOverflows() {
        return this.queueOverflows;
    }

    public int getPendingTaskSize() {
        return this.pendingTasks.size();
    }

    class AcceptorRunner
    implements Runnable {
        AcceptorRunner() {
        }

        @Override
        public void run() {
            long scheduleTime = 0L;
            while (!AcceptorExecutor.this.isShutdown.get()) {
                try {
                    this.drainInputQueues();
                    int totalItems = AcceptorExecutor.this.processingOrder.size();
                    long now = System.currentTimeMillis();
                    if (scheduleTime < now) {
                        scheduleTime = now + AcceptorExecutor.this.trafficShaper.transmissionDelay();
                    }
                    if (scheduleTime <= now) {
                        this.assignItemWork();
                    }
                    if (totalItems != AcceptorExecutor.this.processingOrder.size()) continue;
                    Thread.sleep(10L);
                }
                catch (InterruptedException totalItems) {
                }
                catch (Throwable e) {
                    LOGGER.warn("AcceptorThread error", e);
                }
            }
        }

        private boolean isFull() {
            return AcceptorExecutor.this.pendingTasks.size() >= AcceptorExecutor.this.maxBufferSize;
        }

        private void drainInputQueues() throws InterruptedException {
            do {
                TaskHolder taskHolder;
                this.drainReprocessQueue();
                this.drainAcceptorQueue();
                if (AcceptorExecutor.this.isShutdown.get() || !AcceptorExecutor.this.reprocessQueue.isEmpty() || !AcceptorExecutor.this.acceptorQueue.isEmpty() || !AcceptorExecutor.this.pendingTasks.isEmpty() || (taskHolder = (TaskHolder)AcceptorExecutor.this.acceptorQueue.poll(10L, TimeUnit.MILLISECONDS)) == null) continue;
                this.appendTaskHolder(taskHolder);
            } while (!AcceptorExecutor.this.reprocessQueue.isEmpty() || !AcceptorExecutor.this.acceptorQueue.isEmpty() || AcceptorExecutor.this.pendingTasks.isEmpty());
        }

        private void drainAcceptorQueue() {
            while (!AcceptorExecutor.this.acceptorQueue.isEmpty()) {
                this.appendTaskHolder((TaskHolder)AcceptorExecutor.this.acceptorQueue.poll());
            }
        }

        private void drainReprocessQueue() {
            long now = System.currentTimeMillis();
            while (!AcceptorExecutor.this.reprocessQueue.isEmpty() && !this.isFull()) {
                TaskHolder taskHolder = (TaskHolder)AcceptorExecutor.this.reprocessQueue.pollLast();
                Object id = taskHolder.getId();
                if (taskHolder.getExpiryTime() > 0L && taskHolder.getExpiryTime() <= now) {
                    AcceptorExecutor.this.expiredTasks.incrementAndGet();
                    continue;
                }
                if (AcceptorExecutor.this.pendingTasks.containsKey(id)) {
                    AcceptorExecutor.this.overriddenTasks.incrementAndGet();
                    continue;
                }
                AcceptorExecutor.this.pendingTasks.put(id, taskHolder);
                AcceptorExecutor.this.processingOrder.addFirst(id);
            }
            if (this.isFull()) {
                LOGGER.error("Now pending task full,it will clear reprocessQueue in to add new task,reprocessQueue size={},queueOverflows={},name={}", AcceptorExecutor.this.reprocessQueue.size(), AcceptorExecutor.this.queueOverflows, AcceptorExecutor.this.name);
                AcceptorExecutor.this.queueOverflows.addAndGet(AcceptorExecutor.this.reprocessQueue.size());
                AcceptorExecutor.this.reprocessQueue.clear();
            }
        }

        private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
            TaskHolder previousTask;
            if (this.isFull()) {
                LOGGER.error("Now pending task full,it will remove first one to add task={},queueOverflows={},name={}", taskHolder.getId(), AcceptorExecutor.this.queueOverflows, AcceptorExecutor.this.name);
                AcceptorExecutor.this.pendingTasks.remove(AcceptorExecutor.this.processingOrder.poll());
                AcceptorExecutor.this.queueOverflows.incrementAndGet();
            }
            if ((previousTask = AcceptorExecutor.this.pendingTasks.put(taskHolder.getId(), taskHolder)) == null) {
                AcceptorExecutor.this.processingOrder.add(taskHolder.getId());
            } else {
                AcceptorExecutor.this.overriddenTasks.incrementAndGet();
            }
        }

        void assignItemWork() {
            if (!AcceptorExecutor.this.processingOrder.isEmpty() && AcceptorExecutor.this.workSemaphore.tryAcquire(1)) {
                long now = System.currentTimeMillis();
                while (!AcceptorExecutor.this.processingOrder.isEmpty()) {
                    Object id = AcceptorExecutor.this.processingOrder.poll();
                    TaskHolder holder = (TaskHolder)AcceptorExecutor.this.pendingTasks.remove(id);
                    if (holder.getExpiryTime() < 0L || holder.getExpiryTime() > now) {
                        AcceptorExecutor.this.itemWorkQueue.add(holder);
                        return;
                    }
                    AcceptorExecutor.this.expiredTasks.incrementAndGet();
                }
                AcceptorExecutor.this.workSemaphore.release();
            }
        }
    }
}

