/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.retry.task.support.dispatch.actor.scan;

import akka.actor.AbstractActor;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.RetryStatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.TimerTask;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.handler.ClientNodeAllocateHandler;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.retry.task.dto.RetryPartitionTask;
import com.aizuda.snailjob.server.retry.task.support.RetryTaskConverter;
import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutor;
import com.aizuda.snailjob.server.retry.task.support.dispatch.task.TaskExecutorSceneEnum;
import com.aizuda.snailjob.server.retry.task.support.timer.RetryTimerWheel;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.RetryTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.RetrySceneConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.RetryTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class AbstractScanGroup
extends AbstractActor {
    private static final Logger log = LoggerFactory.getLogger(AbstractScanGroup.class);
    @Autowired
    protected SystemProperties systemProperties;
    @Autowired
    protected AccessTemplate accessTemplate;
    @Autowired
    protected ClientNodeAllocateHandler clientNodeAllocateHandler;
    @Autowired
    protected List<TaskExecutor> taskExecutors;
    @Autowired
    protected RetryTaskMapper retryTaskMapper;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ScanTask.class, config -> {
            long startTime = System.nanoTime();
            try {
                this.doScan((ScanTask)config);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", new Object[]{config, e});
            }
            long endTime = System.nanoTime();
            this.preCostTime().set((endTime - startTime) / 1000000L);
        }).build();
    }

    protected void doScan(ScanTask scanTask) {
        if (this.preCostTime().get() > 0L) {
            long loopCount = Math.max(SystemConstants.SCHEDULE_PERIOD * 1000L / this.preCostTime().get(), 1L);
            loopCount = Math.min(loopCount, (long)this.systemProperties.getRetryMaxPullCount());
            this.prePullCount().set(loopCount);
        }
        String groupName = scanTask.getGroupName();
        String namespaceId = scanTask.getNamespaceId();
        Long lastId = Optional.ofNullable(this.getLastId(groupName)).orElse(0L);
        AtomicInteger count = new AtomicInteger(0);
        long total = PartitionTaskUtils.process(startId -> this.listAvailableTasks(groupName, namespaceId, startId, this.taskActuatorScene().getTaskType().getType()), partitionTasks1 -> this.processRetryPartitionTasks((List<? extends PartitionTask>)partitionTasks1, scanTask), partitionTasks -> {
            if (CollUtil.isEmpty((Collection)partitionTasks)) {
                this.putLastId(scanTask.getGroupName(), 0L);
                return Boolean.TRUE;
            }
            if ((long)count.incrementAndGet() >= this.prePullCount().get()) {
                this.putLastId(scanTask.getGroupName(), ((PartitionTask)partitionTasks.get(partitionTasks.size() - 1)).getId());
                return Boolean.TRUE;
            }
            return false;
        }, (long)lastId);
    }

    private void processRetryPartitionTasks(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
        if (CollUtil.isEmpty(partitionTasks)) {
            return;
        }
        Map<String, RetrySceneConfig> sceneConfigMap = this.getSceneConfigMap(partitionTasks, scanTask);
        ArrayList<RetryTask> waitUpdateRetryTasks = new ArrayList<RetryTask>();
        for (PartitionTask partitionTask : partitionTasks) {
            RetryPartitionTask retryPartitionTask = (RetryPartitionTask)partitionTask;
            RetrySceneConfig retrySceneConfig = sceneConfigMap.get(retryPartitionTask.getSceneName());
            if (Objects.isNull(retrySceneConfig)) continue;
            RetryTask retryTask = this.processRetryTask(retryPartitionTask, retrySceneConfig);
            waitUpdateRetryTasks.add(retryTask);
        }
        this.retryTaskMapper.updateBatchNextTriggerAtById(scanTask.getGroupPartition(), waitUpdateRetryTasks);
        long nowMilli = DateUtils.toNowMilli();
        for (PartitionTask partitionTask : partitionTasks) {
            RetryPartitionTask retryPartitionTask = (RetryPartitionTask)partitionTask;
            long delay = DateUtils.toEpochMilli((LocalDateTime)retryPartitionTask.getNextTriggerAt()) - nowMilli - nowMilli % 100L;
            TimerTask<String> timerTask = this.timerTask(retryPartitionTask);
            RetryTimerWheel.register((String)timerTask.idempotentKey(), timerTask, Duration.ofMillis(delay));
        }
    }

    private Map<String, RetrySceneConfig> getSceneConfigMap(List<? extends PartitionTask> partitionTasks, ScanTask scanTask) {
        Set sceneNameSet = StreamUtils.toSet(partitionTasks, partitionTask -> ((RetryPartitionTask)((Object)partitionTask)).getSceneName());
        List retrySceneConfigs = this.accessTemplate.getSceneConfigAccess().list((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{RetrySceneConfig::getBackOff, RetrySceneConfig::getTriggerInterval, RetrySceneConfig::getSceneName}).eq(RetrySceneConfig::getNamespaceId, (Object)scanTask.getNamespaceId())).eq(RetrySceneConfig::getGroupName, (Object)scanTask.getGroupName())).in(RetrySceneConfig::getSceneName, (Collection)sceneNameSet));
        return StreamUtils.toIdentityMap((Collection)retrySceneConfigs, RetrySceneConfig::getSceneName);
    }

    private RetryTask processRetryTask(RetryPartitionTask partitionTask, RetrySceneConfig retrySceneConfig) {
        RetryTask retryTask = new RetryTask();
        retryTask.setNextTriggerAt(this.calculateNextTriggerTime(partitionTask, retrySceneConfig));
        retryTask.setId(partitionTask.getId());
        return retryTask;
    }

    protected abstract TaskExecutorSceneEnum taskActuatorScene();

    protected abstract Long getLastId(String var1);

    protected abstract void putLastId(String var1, Long var2);

    protected abstract LocalDateTime calculateNextTriggerTime(RetryPartitionTask var1, RetrySceneConfig var2);

    protected abstract TimerTask<String> timerTask(RetryPartitionTask var1);

    protected abstract AtomicLong preCostTime();

    protected abstract AtomicLong prePullCount();

    public List<RetryPartitionTask> listAvailableTasks(String groupName, String namespaceId, Long lastId, Integer taskType) {
        List retryTasks = this.accessTemplate.getRetryTaskAccess().listPage(groupName, namespaceId, new PageDTO(0L, (long)this.systemProperties.getRetryPullPageSize()), (LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{RetryTask::getId, RetryTask::getNextTriggerAt, RetryTask::getUniqueId, RetryTask::getGroupName, RetryTask::getRetryCount, RetryTask::getSceneName, RetryTask::getNamespaceId}).eq(RetryTask::getRetryStatus, (Object)RetryStatusEnum.RUNNING.getStatus())).eq(RetryTask::getGroupName, (Object)groupName)).eq(RetryTask::getNamespaceId, (Object)namespaceId)).eq(RetryTask::getTaskType, (Object)taskType)).le(RetryTask::getNextTriggerAt, (Object)LocalDateTime.now().plusSeconds(SystemConstants.SCHEDULE_PERIOD))).gt(RetryTask::getId, (Object)lastId)).orderByAsc(RetryTask::getId)).getRecords();
        return RetryTaskConverter.INSTANCE.toRetryPartitionTasks(retryTasks);
    }
}

