/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.session.acceptor;

import com.alipay.sofa.registry.common.model.DatumSnapshotRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.session.acceptor.WriteDataRequest;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.renew.RenewService;
import com.alipay.sofa.registry.task.listener.TaskEvent;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;
import com.google.common.collect.Lists;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class WriteDataProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(WriteDataProcessor.class);
    private static final Logger RENEW_LOGGER = LoggerFactory.getLogger((String)"RENEW-LOGGER", (String)"[WriteDataProcessor]");
    private static final Logger taskLogger = LoggerFactory.getLogger(WriteDataProcessor.class, (String)"[Task]");
    private final TaskListenerManager taskListenerManager;
    private final SessionServerConfig sessionServerConfig;
    private final RenewService renewService;
    private final String connectId;
    private Map<String, AtomicLong> lastUpdateTimestampMap = new ConcurrentHashMap<String, AtomicLong>();
    private AtomicBoolean writeDataLock = new AtomicBoolean(false);
    private ConcurrentLinkedQueue<WriteDataRequest> acceptorQueue = new ConcurrentLinkedQueue();
    private AtomicInteger acceptorQueueSize = new AtomicInteger(0);

    public WriteDataProcessor(String connectId, TaskListenerManager taskListenerManager, SessionServerConfig sessionServerConfig, RenewService renewService) {
        this.connectId = connectId;
        this.taskListenerManager = taskListenerManager;
        this.sessionServerConfig = sessionServerConfig;
        this.renewService = renewService;
    }

    private boolean halt() {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("halt: connectId={}", (Object)this.connectId);
        }
        return this.writeDataLock.compareAndSet(false, true);
    }

    public void resume() {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("resume: connectId={}", (Object)this.connectId);
        }
        this.flushQueue();
        this.writeDataLock.compareAndSet(true, false);
        this.flushQueue();
    }

    public void process(WriteDataRequest request) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("process: connectId={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
        }
        if (this.isWriteRequest(request)) {
            this.refreshUpdateTime(request.getDataServerIP());
        }
        if (request.getRequestType() == WriteDataRequest.WriteDataRequestType.DATUM_SNAPSHOT) {
            this.doHandle(request);
        } else if (this.writeDataLock.get()) {
            this.addQueue(request);
        } else {
            this.flushQueue();
            this.doHandle(request);
        }
    }

    private void addQueue(WriteDataRequest request) {
        if (this.acceptorQueueSize.incrementAndGet() <= this.sessionServerConfig.getWriteDataAcceptorQueueSize()) {
            this.acceptorQueue.add(request);
        } else {
            RENEW_LOGGER.error("acceptorQueueSize({}) reached the limit : connectId={}, requestType={}, requestBody={}", new Object[]{this.acceptorQueue.size(), this.connectId, request.getRequestType(), request.getRequestBody()});
        }
    }

    private boolean isWriteRequest(WriteDataRequest request) {
        return request.getRequestType() == WriteDataRequest.WriteDataRequestType.PUBLISHER || request.getRequestType() == WriteDataRequest.WriteDataRequestType.UN_PUBLISHER;
    }

    private void flushQueue() {
        WriteDataRequest writeDataRequest;
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("flushQueue: connectId={}", (Object)this.connectId);
        }
        while (!this.acceptorQueue.isEmpty() && (writeDataRequest = this.acceptorQueue.poll()) != null) {
            this.acceptorQueueSize.decrementAndGet();
            this.doHandle(writeDataRequest);
        }
    }

    private void doHandle(WriteDataRequest request) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("doHandle: connectId={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
        }
        switch (request.getRequestType()) {
            case PUBLISHER: {
                this.doPublishAsync(request);
                break;
            }
            case UN_PUBLISHER: {
                this.doUnPublishAsync(request);
                break;
            }
            case CLIENT_OFF: {
                this.doClientOffAsync(request);
                break;
            }
            case RENEW_DATUM: {
                if (this.renewAndSnapshotInSilence(request.getDataServerIP())) {
                    return;
                }
                this.doRenewAsync(request);
                break;
            }
            case DATUM_SNAPSHOT: {
                if (this.renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) {
                    return;
                }
                this.halt();
                try {
                    this.doSnapshotAsync(request);
                    break;
                }
                finally {
                    this.resume();
                }
            }
            default: {
                LOGGER.warn("Unknown request type, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
            }
        }
    }

    private void doRenewAsync(WriteDataRequest request) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("doRenewAsync: connectId={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
        }
        this.sendEvent(request.getRequestBody(), TaskEvent.TaskType.RENEW_DATUM_TASK);
    }

    private void doClientOffAsync(WriteDataRequest request) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("doClientOffAsync: connectId={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
        }
        String connectId = request.getConnectId();
        this.sendEvent(Lists.newArrayList((Object[])new String[]{connectId}), TaskEvent.TaskType.CANCEL_DATA_TASK);
    }

    private void doUnPublishAsync(WriteDataRequest request) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("doUnPublishAsync: connectId={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
        }
        this.sendEvent(request.getRequestBody(), TaskEvent.TaskType.UN_PUBLISH_DATA_TASK);
    }

    private void doPublishAsync(WriteDataRequest request) {
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("doPublishAsync: connectId={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getRequestType(), request.getRequestBody()});
        }
        this.sendEvent(request.getRequestBody(), TaskEvent.TaskType.PUBLISH_DATA_TASK);
    }

    private void sendEvent(Object eventObj, TaskEvent.TaskType taskType) {
        TaskEvent taskEvent = new TaskEvent(eventObj, taskType);
        if (taskType != TaskEvent.TaskType.RENEW_DATUM_TASK) {
            taskLogger.info("send " + taskType + " taskEvent:{}", (Object)taskEvent);
        }
        this.taskListenerManager.sendTaskEvent(taskEvent);
    }

    private void doSnapshotAsync(WriteDataRequest request) {
        RENEW_LOGGER.info("doSnapshotAsync: connectId={}, dataServerIP={}, requestType={}, requestBody={}", new Object[]{this.connectId, request.getDataServerIP(), request.getRequestType(), request.getRequestBody()});
        String connectId = (String)request.getRequestBody();
        DatumSnapshotRequest datumSnapshotRequest = this.renewService.getDatumSnapshotRequest(connectId, request.getDataServerIP());
        if (datumSnapshotRequest != null) {
            TaskEvent taskEvent = new TaskEvent((Object)datumSnapshotRequest, TaskEvent.TaskType.DATUM_SNAPSHOT_TASK);
            this.taskListenerManager.sendTaskEvent(taskEvent);
        } else {
            RENEW_LOGGER.info("datumSnapshotRequest is null when doSnapshotAsync: connectId={}, dataServerIP={}, requestType={}", new Object[]{connectId, request.getDataServerIP(), request.getRequestType()});
        }
    }

    private boolean renewAndSnapshotInSilence(String dataServerIP) {
        boolean renewAndSnapshotInSilence;
        boolean bl = renewAndSnapshotInSilence = System.currentTimeMillis() - this.getLastUpdateTime(dataServerIP).get() < (long)this.sessionServerConfig.getRenewAndSnapshotSilentPeriodSec() * 1000L;
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("renewAndSnapshotInSilence: connectId={}, renewAndSnapshotInSilence={}", (Object)this.connectId, (Object)renewAndSnapshotInSilence);
        }
        return renewAndSnapshotInSilence;
    }

    private boolean renewAndSnapshotInSilenceAndRefreshUpdateTime(String dataServerIP) {
        boolean renewAndSnapshotInSilence;
        boolean bl = renewAndSnapshotInSilence = System.currentTimeMillis() - this.refreshUpdateTime(dataServerIP) < (long)this.sessionServerConfig.getRenewAndSnapshotSilentPeriodSec() * 1000L;
        if (RENEW_LOGGER.isDebugEnabled()) {
            RENEW_LOGGER.debug("renewAndSnapshotInSilenceAndRefreshUpdateTime: connectId={}, renewAndSnapshotInSilence={}", (Object)this.connectId, (Object)renewAndSnapshotInSilence);
        }
        return renewAndSnapshotInSilence;
    }

    private long refreshUpdateTime(String dataServerIP) {
        AtomicLong lastUpdateTimestamp = this.getLastUpdateTime(dataServerIP);
        return lastUpdateTimestamp.getAndSet(System.currentTimeMillis());
    }

    private AtomicLong getLastUpdateTime(String dataServerIP) {
        AtomicLong _lastUpdateTimestamp;
        AtomicLong lastUpdateTimestamp = this.lastUpdateTimestampMap.get(dataServerIP);
        if (lastUpdateTimestamp == null && (_lastUpdateTimestamp = this.lastUpdateTimestampMap.putIfAbsent(dataServerIP, lastUpdateTimestamp = new AtomicLong(0L))) != null) {
            lastUpdateTimestamp = _lastUpdateTimestamp;
        }
        return lastUpdateTimestamp;
    }
}

