/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.config.client;

import com.taobao.config.client.ConfigClientLogger;
import com.taobao.config.client.DefaultDataClient;
import com.taobao.config.client.FluentIterator;
import com.taobao.config.client.PublisherRegistrar;
import com.taobao.config.client.RunMode;
import com.taobao.config.client.ServerListManager;
import com.taobao.config.client.SubscriberRegistrar;
import com.taobao.config.client.exception.ExcEvent;
import com.taobao.config.client.exception.ExcEventDispatch;
import com.taobao.config.client.exception.ExcType;
import com.taobao.config.client.processor.ElementProcessorHub;
import com.taobao.config.common.protocol.ProtocolElement;
import com.taobao.config.common.protocol.ProtocolPackage;
import com.taobao.middleware.logger.Logger;
import com.taobao.remoting.Client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ConfigClientWorker
implements Runnable {
    private static final Logger log = ConfigClientLogger.getLogger(ConfigClientWorker.class);
    private static final ElementProcessorHub processorManager = new ElementProcessorHub();
    private final BlockingQueue<ProtocolPackage> mailbox = new LinkedBlockingQueue<ProtocolPackage>(MAX_MAILBOX_SIZE);
    private static int MAX_MAILBOX_SIZE = 2048;
    private static final int SEND_BATCH = 32;
    private final ServerListManager serverListManager;
    private volatile Client client = null;
    private volatile boolean isContinue = true;
    private BlockingQueue<Object> bell = new ArrayBlockingQueue<Object>(1);
    private Object bellItem = new Object();

    public ConfigClientWorker(ServerListManager serverListManager) {
        this.serverListManager = serverListManager;
    }

    public ServerListManager getServerListManager() {
        return this.serverListManager;
    }

    public BlockingQueue<ProtocolPackage> getMailbox() {
        return this.mailbox;
    }

    public void waitUntilNormalMode() {
        while (RunMode.isFailoverMode()) {
            try {
                this.wait(TimeUnit.SECONDS.toMillis(5L));
            }
            catch (Exception e) {
                log.error("%s", "error when waiting for normal mode: " + e.toString(), e);
            }
        }
    }

    public void closeConnection() {
        if (this.client == null) {
            return;
        }
        try {
            this.client.destroy();
        }
        catch (Throwable t) {
            log.error("%s", "[Network] Failed to close connection due to " + t);
        }
    }

    @Override
    public void run() {
        log.info("[Global] Deliverer thread is starting...");
        while (this.isContinue) {
            try {
                this.runOnce();
            }
            catch (Exception e) {
                log.error("%s", "[Internal] Unhandled exception in deliverer: ", e);
            }
            finally {
                try {
                    this.rest(3000L);
                }
                catch (Exception e) {
                    log.error("%s", "[Internal] rest is error Unhandled exception in deliverer: ", e);
                }
            }
        }
    }

    public void signal() {
        this.bell.offer(this.bellItem);
    }

    private void rest(long timeoutMs) throws InterruptedException {
        this.bell.poll(timeoutMs, TimeUnit.MILLISECONDS);
    }

    private void runOnce() throws InterruptedException {
        this.waitUntilNormalMode();
        while (this.mailbox.size() > 0) {
            ProtocolPackage message;
            while ((message = (ProtocolPackage)this.mailbox.poll()) != null) {
                this.handleServerMessage(message);
            }
        }
        HashMap<ServerListManager, ArrayList<DefaultDataClient>> clientGroups = new HashMap<ServerListManager, ArrayList<DefaultDataClient>>();
        Iterator<?> iter = FluentIterator.asIterator(PublisherRegistrar.publisherIterator(), SubscriberRegistrar.subscriberIterator());
        while (iter.hasNext()) {
            DefaultDataClient dataClient = (DefaultDataClient)iter.next();
            if (this.serverListManager != dataClient.serverMgr) continue;
            ArrayList<DefaultDataClient> clients = (ArrayList<DefaultDataClient>)clientGroups.get(dataClient.serverMgr);
            if (null == clients) {
                clients = new ArrayList<DefaultDataClient>();
                clientGroups.put(dataClient.serverMgr, clients);
            }
            clients.add(dataClient);
        }
        block6: for (Map.Entry entry : clientGroups.entrySet()) {
            ServerListManager serverMgr = (ServerListManager)entry.getKey();
            List clients = (List)entry.getValue();
            Collections.sort(clients, new Comparator<DefaultDataClient>(){

                @Override
                public int compare(DefaultDataClient defaultDataClient, DefaultDataClient t1) {
                    if (defaultDataClient.isDisable()) {
                        return -1;
                    }
                    if (t1.isDisable()) {
                        return 1;
                    }
                    return 0;
                }
            });
            Iterator iter2 = clients.iterator();
            while (iter2.hasNext()) {
                ProtocolPackage packagee = new ProtocolPackage();
                for (int actualCount = 0; iter2.hasNext() && actualCount <= 32; ++actualCount) {
                    DefaultDataClient dataClient = (DefaultDataClient)iter2.next();
                    if (dataClient.isSynchronized()) continue;
                    dataClient.addPackage(packagee);
                }
                if (packagee.countElements() <= 1) continue;
                if (!this.ensureConnected(serverMgr)) continue block6;
                try {
                    this.handleServerMessage(serverMgr.connectionProxy.sendReceive(packagee));
                }
                catch (InterruptedException i) {
                    throw i;
                }
                catch (Throwable t) {
                    log.error("%s", "[Network] Request failed due to " + t);
                    ExcEventDispatch.fireEvent(new ExcEvent(ExcType.UKNOWN, "[Network] Request failed due to " + t));
                }
            }
        }
    }

    private boolean ensureConnected(ServerListManager serverMgr) throws InterruptedException {
        if (serverMgr.connectionProxy.isConnected()) {
            return true;
        }
        log.info("[Global] Connecting to servers... " + serverMgr);
        return serverMgr.connectionProxy.connect();
    }

    private void handleServerMessage(ProtocolPackage message) {
        for (ProtocolElement element : message) {
            try {
                processorManager.processMessage(element, message);
            }
            catch (Exception e) {
                log.warn("Exception in processing " + element.getClass().getName() + ": ", e);
            }
        }
    }
}

