/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.handler;

import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskStatusEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.enums.MapReduceStageEnum;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.WaitStrategy;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import com.aizuda.snailjob.server.common.enums.JobTaskExecutorSceneEnum;
import com.aizuda.snailjob.server.common.strategy.WaitStrategies;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.CompleteJobBatchDTO;
import com.aizuda.snailjob.server.job.task.dto.JobTimerTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.dto.TaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.JobTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.ResidentTaskCache;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.ResidentJobTimerTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.GroupConfigMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.GroupConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class JobTaskBatchHandler {
    private static final Logger log = LoggerFactory.getLogger(JobTaskBatchHandler.class);
    private final JobTaskMapper jobTaskMapper;
    private final JobTaskBatchMapper jobTaskBatchMapper;
    private final WorkflowBatchHandler workflowBatchHandler;
    private final GroupConfigMapper groupConfigMapper;

    @Transactional
    public boolean complete(CompleteJobBatchDTO completeJobBatchDTO) {
        Long countJobTaskBatch = this.jobTaskBatchMapper.selectCount((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().eq(JobTaskBatch::getId, (Object)completeJobBatchDTO.getTaskBatchId())).in(JobTaskBatch::getTaskBatchStatus, (Collection)JobTaskBatchStatusEnum.COMPLETED));
        if (countJobTaskBatch > 0L) {
            return true;
        }
        List jobTasks = this.jobTaskMapper.selectList((Wrapper)new LambdaQueryWrapper().select(new SFunction[]{JobTask::getTaskStatus, JobTask::getMrStage}).eq(JobTask::getTaskBatchId, (Object)completeJobBatchDTO.getTaskBatchId()));
        if (CollUtil.isEmpty((Collection)jobTasks) || jobTasks.stream().anyMatch(jobTask -> JobTaskStatusEnum.NOT_COMPLETE.contains(jobTask.getTaskStatus()))) {
            return false;
        }
        JobTaskBatch jobTaskBatch = new JobTaskBatch();
        jobTaskBatch.setId(completeJobBatchDTO.getTaskBatchId());
        Map<Integer, Long> statusCountMap = jobTasks.stream().collect(Collectors.groupingBy(JobTask::getTaskStatus, Collectors.counting()));
        long failCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.FAIL.getStatus(), 0L);
        long stopCount = statusCountMap.getOrDefault(JobTaskBatchStatusEnum.STOP.getStatus(), 0L);
        if (failCount > 0L) {
            jobTaskBatch.setTaskBatchStatus(Integer.valueOf(JobTaskBatchStatusEnum.FAIL.getStatus()));
            SnailSpringContext.getContext().publishEvent((ApplicationEvent)new JobTaskFailAlarmEvent(completeJobBatchDTO.getTaskBatchId()));
        } else if (stopCount > 0L) {
            jobTaskBatch.setTaskBatchStatus(Integer.valueOf(JobTaskBatchStatusEnum.STOP.getStatus()));
        } else {
            jobTaskBatch.setTaskBatchStatus(Integer.valueOf(JobTaskBatchStatusEnum.SUCCESS.getStatus()));
            if (this.needReduceTask(completeJobBatchDTO, jobTasks) && JobTaskTypeEnum.MAP_REDUCE.getType() == completeJobBatchDTO.getTaskType().intValue()) {
                return false;
            }
        }
        if (Objects.nonNull(completeJobBatchDTO.getJobOperationReason())) {
            jobTaskBatch.setOperationReason(completeJobBatchDTO.getJobOperationReason());
        }
        WorkflowNodeTaskExecuteDTO taskExecuteDTO = new WorkflowNodeTaskExecuteDTO();
        taskExecuteDTO.setWorkflowTaskBatchId(completeJobBatchDTO.getWorkflowTaskBatchId());
        taskExecuteDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType());
        taskExecuteDTO.setParentId(completeJobBatchDTO.getWorkflowNodeId());
        taskExecuteDTO.setTaskBatchId(completeJobBatchDTO.getTaskBatchId());
        this.workflowBatchHandler.openNextNode(taskExecuteDTO);
        jobTaskBatch.setUpdateDt(LocalDateTime.now());
        return 1 == this.jobTaskBatchMapper.update((Object)jobTaskBatch, (Wrapper)((LambdaUpdateWrapper)new LambdaUpdateWrapper().eq(JobTaskBatch::getId, (Object)completeJobBatchDTO.getTaskBatchId())).in(JobTaskBatch::getTaskBatchStatus, (Collection)JobTaskBatchStatusEnum.NOT_COMPLETE));
    }

    private boolean needReduceTask(CompleteJobBatchDTO completeJobBatchDTO, List<JobTask> jobTasks) {
        Integer mrStage = null;
        int reduceCount = 0;
        int mapCount = 0;
        for (JobTask jobTask : jobTasks) {
            if (Objects.isNull(jobTask.getMrStage())) continue;
            if (MapReduceStageEnum.MERGE_REDUCE.getStage() == jobTask.getMrStage().intValue()) {
                return false;
            }
            if (MapReduceStageEnum.REDUCE.getStage() == jobTask.getMrStage().intValue()) {
                ++reduceCount;
                continue;
            }
            if (MapReduceStageEnum.MAP.getStage() != jobTask.getMrStage().intValue()) continue;
            ++mapCount;
        }
        if (reduceCount > 1) {
            mrStage = MapReduceStageEnum.MERGE_REDUCE.getStage();
        } else if (mapCount == jobTasks.size()) {
            mrStage = MapReduceStageEnum.REDUCE.getStage();
        } else {
            return false;
        }
        try {
            ReduceTaskDTO reduceTaskDTO = JobTaskConverter.INSTANCE.toReduceTaskDTO(completeJobBatchDTO);
            reduceTaskDTO.setMrStage(mrStage);
            ActorRef actorRef = ActorGenerator.jobReduceActor();
            actorRef.tell((Object)reduceTaskDTO, actorRef);
            return true;
        }
        catch (Exception e) {
            SnailJobLog.LOCAL.error("tell reduce actor error", new Object[]{e});
            return false;
        }
    }

    private static boolean isAllMapTask(List<JobTask> jobTasks) {
        return (long)jobTasks.size() == jobTasks.stream().filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MapReduceStageEnum.MAP.getStage() == jobTask.getMrStage().intValue()).count();
    }

    private static boolean isALeastOneReduceTask(List<JobTask> jobTasks) {
        return jobTasks.stream().filter(jobTask -> Objects.nonNull(jobTask.getMrStage()) && MapReduceStageEnum.REDUCE.getStage() == jobTask.getMrStage().intValue()).count() > 1L;
    }

    public void openResidentTask(Job job, TaskExecuteDTO taskExecuteDTO) {
        if (Objects.isNull(job) || JobTaskExecutorSceneEnum.MANUAL_JOB.getType().equals(taskExecuteDTO.getTaskExecutorScene()) || JobTaskExecutorSceneEnum.AUTO_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) || JobTaskExecutorSceneEnum.MANUAL_WORKFLOW.getType().equals(taskExecuteDTO.getTaskExecutorScene()) || Objects.equals(StatusEnum.NO.getStatus(), job.getResident()) || !DistributeInstance.INSTANCE.getConsumerBucket().contains(job.getBucketIndex())) {
            return;
        }
        long count = this.groupConfigMapper.selectCount((Wrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().eq(GroupConfig::getNamespaceId, (Object)job.getNamespaceId())).eq(GroupConfig::getGroupName, (Object)job.getGroupName())).eq(GroupConfig::getGroupStatus, (Object)StatusEnum.YES.getStatus()));
        if (count == 0L) {
            return;
        }
        JobTimerTaskDTO jobTimerTaskDTO = new JobTimerTaskDTO();
        jobTimerTaskDTO.setJobId(taskExecuteDTO.getJobId());
        jobTimerTaskDTO.setTaskBatchId(taskExecuteDTO.getTaskBatchId());
        jobTimerTaskDTO.setTaskExecutorScene(JobTaskExecutorSceneEnum.AUTO_JOB.getType());
        WaitStrategy waitStrategy = WaitStrategies.WaitStrategyEnum.getWaitStrategy((int)job.getTriggerType());
        Long preTriggerAt = ResidentTaskCache.get(job.getId());
        if (Objects.isNull(preTriggerAt) || preTriggerAt < job.getNextTriggerAt()) {
            preTriggerAt = job.getNextTriggerAt();
        }
        WaitStrategies.WaitStrategyContext waitStrategyContext = new WaitStrategies.WaitStrategyContext();
        waitStrategyContext.setTriggerInterval(job.getTriggerInterval());
        waitStrategyContext.setNextTriggerAt(preTriggerAt.longValue());
        Long nextTriggerAt = waitStrategy.computeTriggerTime(waitStrategyContext);
        long milliseconds = nextTriggerAt - preTriggerAt;
        Duration duration = Duration.ofMillis(milliseconds - DateUtils.toNowMilli() % 1000L);
        log.debug("\u5e38\u9a7b\u4efb\u52a1\u76d1\u63a7. [{}] \u4efb\u52a1\u65f6\u95f4\u5dee:[{}] \u53d6\u4f59:[{}]", new Object[]{duration, milliseconds, DateUtils.toNowMilli() % 1000L});
        job.setNextTriggerAt(nextTriggerAt);
        JobTimerWheel.registerWithJob(() -> new ResidentJobTimerTask(jobTimerTaskDTO, job), duration);
        ResidentTaskCache.refresh(job.getId(), nextTriggerAt);
    }

    public JobTaskBatchHandler(JobTaskMapper jobTaskMapper, JobTaskBatchMapper jobTaskBatchMapper, WorkflowBatchHandler workflowBatchHandler, GroupConfigMapper groupConfigMapper) {
        this.jobTaskMapper = jobTaskMapper;
        this.jobTaskBatchMapper = jobTaskBatchMapper;
        this.workflowBatchHandler = workflowBatchHandler;
        this.groupConfigMapper = groupConfigMapper;
    }
}

