/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.dispatch;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
import com.aizuda.snailjob.server.common.config.SystemProperties;
import com.aizuda.snailjob.server.common.dto.PartitionTask;
import com.aizuda.snailjob.server.common.dto.ScanTask;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.common.util.PartitionTaskUtils;
import com.aizuda.snailjob.server.job.task.dto.JobPartitionTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTaskPrepareDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component(value="ScanJobActor")
@Scope(value="prototype")
public class ScanJobTaskActor
extends AbstractActor {
    private static final Logger log = LoggerFactory.getLogger(ScanJobTaskActor.class);
    private final JobMapper jobMapper;
    private final SystemProperties systemProperties;
    private final GroupConfigMapper groupConfigMapper;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(ScanTask.class, config -> {
            try {
                this.doScan((ScanTask)config);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("Data scanner processing exception. [{}]", new Object[]{config, e});
            }
        }).build();
    }

    private void doScan(ScanTask scanTask) {
        if (CollectionUtils.isEmpty((Collection)scanTask.getBuckets())) {
            return;
        }
        long total = PartitionTaskUtils.process(startId -> this.listAvailableJobs(startId, scanTask), this::processJobPartitionTasks, (long)0L);
        log.debug("job scan end. total:[{}]", (Object)total);
    }

    private void processJobPartitionTasks(List<? extends PartitionTask> partitionTasks) {
        ArrayList<Job> waitUpdateJobs = new ArrayList<Job>();
        ArrayList<JobTaskPrepareDTO> waitExecJobs = new ArrayList<JobTaskPrepareDTO>();
        long now = DateUtils.toNowMilli();
        for (PartitionTask partitionTask : partitionTasks) {
            this.processJob((JobPartitionTaskDTO)partitionTask, waitUpdateJobs, waitExecJobs, now);
        }
        this.jobMapper.updateBatchNextTriggerAtById(waitUpdateJobs);
        for (JobTaskPrepareDTO jobTaskPrepareDTO : waitExecJobs) {
            ActorRef actorRef = ActorGenerator.jobTaskPrepareActor();
            jobTaskPrepareDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
            actorRef.tell((Object)jobTaskPrepareDTO, actorRef);
        }
    }

    private void processJob(JobPartitionTaskDTO partitionTask, List<Job> waitUpdateJobs, List<JobTaskPrepareDTO> waitExecJobs, long now) {
        CacheConsumerGroup.addOrUpdate((String)partitionTask.getGroupName(), (String)partitionTask.getNamespaceId());
        Job job = new Job();
        job.setId(partitionTask.getId());
        boolean triggerTask = true;
        Long nextTriggerAt = ResidentTaskCache.get(partitionTask.getId());
        if (ScanJobTaskActor.needCalculateNextTriggerTime(partitionTask)) {
            nextTriggerAt = this.calculateNextTriggerTime(partitionTask, now);
        } else {
            triggerTask = Objects.isNull(nextTriggerAt);
            if (Objects.isNull(nextTriggerAt) || nextTriggerAt + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD) < now) {
                nextTriggerAt = now;
            }
        }
        job.setNextTriggerAt(nextTriggerAt);
        waitUpdateJobs.add(job);
        if (triggerTask) {
            waitExecJobs.add(JobTaskConverter.INSTANCE.toJobTaskPrepare(partitionTask));
        }
    }

    private static boolean needCalculateNextTriggerTime(JobPartitionTaskDTO partitionTask) {
        return !Objects.equals(StatusEnum.YES.getStatus(), partitionTask.getResident());
    }

    private Long calculateNextTriggerTime(JobPartitionTaskDTO partitionTask, long now) {
        long nextTriggerAt = partitionTask.getNextTriggerAt();
        if (nextTriggerAt + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD) < now) {
            nextTriggerAt = now;
            partitionTask.setNextTriggerAt(nextTriggerAt);
        }
        WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy((int)partitionTask.getTriggerType());
        WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
        waitStrategyContext.setTriggerInterval(partitionTask.getTriggerInterval());
        waitStrategyContext.setNextTriggerAt(nextTriggerAt);
        return waitStrategy.computeTriggerTime(waitStrategyContext);
    }

    private List<JobPartitionTaskDTO> listAvailableJobs(Long startId, ScanTask scanTask) {
        if (CollectionUtils.isEmpty((Collection)scanTask.getBuckets())) {
            return Collections.emptyList();
        }
        List jobs = ((PageDTO)this.jobMapper.selectPage((IPage)new PageDTO(0L, (long)this.systemProperties.getJobPullPageSize()), (Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{Job::getGroupName, Job::getNextTriggerAt, Job::getBlockStrategy, Job::getTriggerType, Job::getTriggerInterval, Job::getExecutorTimeout, Job::getTaskType, Job::getResident, Job::getId, Job::getNamespaceId}).eq(Job::getJobStatus, (Object)StatusEnum.YES.getStatus())).eq(Job::getDeleted, (Object)StatusEnum.NO.getStatus())).ne(Job::getTriggerType, (Object)SystemConstants.WORKFLOW_TRIGGER_TYPE)).in(Job::getBucketIndex, (Collection)scanTask.getBuckets())).le(Job::getNextTriggerAt, (Object)(DateUtils.toNowMilli() + DateUtils.toEpochMilli((long)SystemConstants.SCHEDULE_PERIOD)))).ge(Job::getId, (Object)startId)).orderByAsc(Job::getId))).getRecords();
        if (!CollectionUtils.isEmpty((Collection)jobs)) {
            List groupConfigs = StreamUtils.toList((Collection)this.groupConfigMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{GroupConfig::getGroupName}).eq(GroupConfig::getGroupStatus, (Object)StatusEnum.YES.getStatus())).in(GroupConfig::getGroupName, (Collection)StreamUtils.toSet((Collection)jobs, Job::getGroupName))), GroupConfig::getGroupName);
            jobs = jobs.stream().filter(job -> groupConfigs.contains(job.getGroupName())).collect(Collectors.toList());
        }
        return JobTaskConverter.INSTANCE.toJobPartitionTasks(jobs);
    }

    public ScanJobTaskActor(JobMapper jobMapper, SystemProperties systemProperties, GroupConfigMapper groupConfigMapper) {
        this.jobMapper = jobMapper;
        this.systemProperties = systemProperties;
        this.groupConfigMapper = groupConfigMapper;
    }
}

