/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.job.task.support.dispatch;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.core.context.SnailSpringContext;
import com.aizuda.snailjob.common.core.enums.FailStrategyEnum;
import com.aizuda.snailjob.common.core.enums.JobNotifySceneEnum;
import com.aizuda.snailjob.common.core.enums.JobOperationReasonEnum;
import com.aizuda.snailjob.common.core.enums.JobTaskBatchStatusEnum;
import com.aizuda.snailjob.common.core.enums.WorkflowNodeTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.common.util.DateUtils;
import com.aizuda.snailjob.server.job.task.dto.WorkflowNodeTaskExecuteDTO;
import com.aizuda.snailjob.server.job.task.dto.WorkflowTaskFailAlarmEventDTO;
import com.aizuda.snailjob.server.job.task.support.WorkflowExecutor;
import com.aizuda.snailjob.server.job.task.support.WorkflowTaskConverter;
import com.aizuda.snailjob.server.job.task.support.alarm.event.WorkflowTaskFailAlarmEvent;
import com.aizuda.snailjob.server.job.task.support.cache.MutableGraphCache;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.workflow.WorkflowExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.WorkflowBatchHandler;
import com.aizuda.snailjob.server.job.task.support.timer.JobTimerWheel;
import com.aizuda.snailjob.server.job.task.support.timer.WorkflowTimeoutCheckTask;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowNodeMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTaskBatch;
import com.aizuda.snailjob.template.datasource.persistence.po.Workflow;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowNode;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
import java.io.Serializable;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pekko.actor.AbstractActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component(value="WorkflowExecutorActor")
@Scope(value="prototype")
public class WorkflowExecutorActor
extends AbstractActor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WorkflowExecutorActor.class);
    private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
    private final WorkflowNodeMapper workflowNodeMapper;
    private final WorkflowMapper workflowMapper;
    private final JobMapper jobMapper;
    private final JobTaskBatchMapper jobTaskBatchMapper;
    private final WorkflowBatchHandler workflowBatchHandler;

    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(WorkflowNodeTaskExecuteDTO.class, taskExecute -> {
            log.info("\u5de5\u4f5c\u6d41\u5f00\u59cb\u6267\u884c. [{}]", (Object)JsonUtil.toJsonString((Object)taskExecute));
            try {
                this.doExecutor((WorkflowNodeTaskExecuteDTO)taskExecute);
            }
            catch (Exception e) {
                SnailJobLog.LOCAL.error("workflow executor exception. [{}]", new Object[]{taskExecute, e});
                this.handlerTaskBatch((WorkflowNodeTaskExecuteDTO)taskExecute, JobTaskBatchStatusEnum.FAIL.getStatus(), JobOperationReasonEnum.TASK_EXECUTION_ERROR.getReason());
                SnailSpringContext.getContext().publishEvent((ApplicationEvent)new WorkflowTaskFailAlarmEvent(WorkflowTaskFailAlarmEventDTO.builder().workflowTaskBatchId(taskExecute.getWorkflowTaskBatchId()).notifyScene(JobNotifySceneEnum.WORKFLOW_TASK_ERROR.getNotifyScene()).reason(e.getMessage()).build()));
            }
            finally {
                this.getContext().stop(this.getSelf());
            }
        }).build();
    }

    private void doExecutor(WorkflowNodeTaskExecuteDTO taskExecute) {
        WorkflowTaskBatch workflowTaskBatch = (WorkflowTaskBatch)this.workflowTaskBatchMapper.selectById((Serializable)taskExecute.getWorkflowTaskBatchId());
        Assert.notNull((Object)workflowTaskBatch, () -> new SnailJobServerException("\u4efb\u52a1\u4e0d\u5b58\u5728"));
        if (SystemConstants.ROOT.equals(taskExecute.getParentId()) && JobTaskBatchStatusEnum.WAITING.getStatus() == workflowTaskBatch.getTaskBatchStatus().intValue()) {
            this.handlerTaskBatch(taskExecute, JobTaskBatchStatusEnum.RUNNING.getStatus(), JobOperationReasonEnum.NONE.getReason());
            Workflow workflow = (Workflow)this.workflowMapper.selectById((Serializable)workflowTaskBatch.getWorkflowId());
            JobTimerWheel.clearCache(MessageFormat.format("workflow_{0}", taskExecute.getWorkflowTaskBatchId()));
            JobTimerWheel.registerWithWorkflow(() -> new WorkflowTimeoutCheckTask(taskExecute.getWorkflowTaskBatchId()), Duration.ofSeconds(workflow.getExecutorTimeout().intValue()));
        }
        String flowInfo = workflowTaskBatch.getFlowInfo();
        MutableGraph<Long> graph = MutableGraphCache.getOrDefault(workflowTaskBatch.getId(), flowInfo);
        Set<Long> brotherNode = MutableGraphCache.getBrotherNode(graph, taskExecute.getParentId());
        Sets.SetView setView = Sets.union(brotherNode, (Set)Sets.newHashSet((Object[])new Long[]{taskExecute.getParentId()}));
        HashSet allSuccessors = Sets.newHashSet();
        for (Long nodeId : setView.immutableCopy()) {
            Set successors = graph.successors((Object)nodeId);
            if (!CollUtil.isNotEmpty((Collection)successors)) continue;
            for (Long successor : successors) {
                allSuccessors.addAll(graph.predecessors((Object)successor));
            }
            allSuccessors.addAll(successors);
        }
        log.debug("\u7236\u8282\u70b9:[{}] \u6240\u6709\u7684\u8282\u70b9:[{}]", (Object)taskExecute.getParentId(), (Object)allSuccessors);
        if (CollUtil.isEmpty((Collection)allSuccessors)) {
            this.workflowBatchHandler.complete(taskExecute.getWorkflowTaskBatchId(), workflowTaskBatch);
            return;
        }
        List allJobTaskBatchList = this.jobTaskBatchMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().select(new SFunction[]{JobTaskBatch::getWorkflowTaskBatchId, JobTaskBatch::getWorkflowNodeId, JobTaskBatch::getTaskBatchStatus, JobTaskBatch::getOperationReason, JobTaskBatch::getId}).eq(JobTaskBatch::getWorkflowTaskBatchId, (Object)workflowTaskBatch.getId())).in(JobTaskBatch::getWorkflowNodeId, (Collection)Sets.union((Set)allSuccessors, (Set)Sets.newHashSet((Object[])new Long[]{taskExecute.getParentId()}))));
        List workflowNodes = this.workflowNodeMapper.selectList((Wrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().in(WorkflowNode::getId, (Collection)Sets.union((Set)allSuccessors, (Set)Sets.newHashSet((Object[])new Long[]{taskExecute.getParentId()})))).orderByAsc(WorkflowNode::getPriorityLevel));
        Map jobTaskBatchMap = StreamUtils.groupByKey((Collection)allJobTaskBatchList, JobTaskBatch::getWorkflowNodeId);
        Map workflowNodeMap = StreamUtils.toIdentityMap((Collection)workflowNodes, WorkflowNode::getId);
        List parentJobTaskBatchList = (List)jobTaskBatchMap.get(taskExecute.getParentId());
        WorkflowNode parentWorkflowNode = (WorkflowNode)workflowNodeMap.get(taskExecute.getParentId());
        if (Objects.nonNull(parentWorkflowNode) && WorkflowNodeTypeEnum.DECISION.getType() == parentWorkflowNode.getNodeType().intValue()) {
            Set successors = graph.successors((Object)parentWorkflowNode.getId());
            workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId()) && successors.contains(workflowNode.getId())).collect(Collectors.toList());
        } else {
            workflowNodes = workflowNodes.stream().filter(workflowNode -> !workflowNode.getId().equals(taskExecute.getParentId())).collect(Collectors.toList());
            this.workflowBatchHandler.mergeWorkflowContextAndRetry(workflowTaskBatch, StreamUtils.toSet((Collection)allJobTaskBatchList, JobTaskBatch::getId));
        }
        List jobs = this.jobMapper.selectBatchIds((Collection)StreamUtils.toSet(workflowNodes, WorkflowNode::getJobId));
        Map jobMap = StreamUtils.toIdentityMap((Collection)jobs, Job::getId);
        Object evaluationResult = null;
        log.debug("\u5f85\u6267\u884c\u7684\u8282\u70b9\u4e3a. workflowNodes:[{}]", (Object)StreamUtils.toList(workflowNodes, WorkflowNode::getId));
        for (WorkflowNode workflowNode2 : workflowNodes) {
            List jobTaskBatchList = (List)jobTaskBatchMap.get(workflowNode2.getId());
            if (CollUtil.isNotEmpty((Collection)jobTaskBatchList)) continue;
            Set predecessors = graph.predecessors((Object)workflowNode2.getId());
            boolean predecessorsComplete = this.arePredecessorsComplete(taskExecute, predecessors, jobTaskBatchMap, workflowNode2, workflowNodeMap);
            if (!SystemConstants.ROOT.equals(taskExecute.getParentId()) && !predecessorsComplete) continue;
            WorkflowExecutor workflowExecutor = WorkflowExecutorFactory.getWorkflowExecutor(workflowNode2.getNodeType());
            WorkflowExecutorContext context = WorkflowTaskConverter.INSTANCE.toWorkflowExecutorContext(workflowNode2);
            context.setJob((Job)jobMap.get(workflowNode2.getJobId()));
            context.setWorkflowTaskBatchId(taskExecute.getWorkflowTaskBatchId());
            context.setParentWorkflowNodeId(taskExecute.getParentId());
            context.setEvaluationResult(evaluationResult);
            context.setTaskBatchId(taskExecute.getTaskBatchId());
            context.setTaskExecutorScene(taskExecute.getTaskExecutorScene());
            context.setWfContext(workflowTaskBatch.getWfContext());
            if (CollUtil.isNotEmpty((Collection)parentJobTaskBatchList)) {
                WorkflowExecutorActor.fillParentOperationReason(allJobTaskBatchList, parentJobTaskBatchList, parentWorkflowNode, context);
            }
            workflowExecutor.execute(context);
            evaluationResult = context.getEvaluationResult();
        }
    }

    private static void fillParentOperationReason(List<JobTaskBatch> allJobTaskBatchList, List<JobTaskBatch> parentJobTaskBatchList, WorkflowNode parentWorkflowNode, WorkflowExecutorContext context) {
        JobTaskBatch jobTaskBatch = allJobTaskBatchList.stream().filter(batch -> !JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(batch.getOperationReason())).findFirst().orElse(null);
        if (parentJobTaskBatchList.stream().map(JobTaskBatch::getOperationReason).filter(Objects::nonNull).anyMatch(JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION::contains) && Objects.nonNull(jobTaskBatch) && parentWorkflowNode.getNodeType().intValue() != WorkflowNodeTypeEnum.DECISION.getType()) {
            context.setParentOperationReason(JobOperationReasonEnum.NONE.getReason());
        } else {
            context.setParentOperationReason(parentJobTaskBatchList.get(0).getOperationReason());
        }
    }

    private boolean arePredecessorsComplete(WorkflowNodeTaskExecuteDTO taskExecute, Set<Long> predecessors, Map<Long, List<JobTaskBatch>> jobTaskBatchMap, WorkflowNode waitExecWorkflowNode, Map<Long, WorkflowNode> workflowNodeMap) {
        for (Long nodeId : predecessors) {
            WorkflowNode preWorkflowNode;
            if (SystemConstants.ROOT.equals(nodeId)) continue;
            List<JobTaskBatch> jobTaskBatches = jobTaskBatchMap.get(nodeId);
            if (CollUtil.isEmpty(jobTaskBatches)) {
                SnailJobLog.LOCAL.info("\u6279\u6b21\u4e3a\u7a7a\u5b58\u5728\u672a\u5b8c\u6210\u7684\u5144\u5f1f\u8282\u70b9. [{}] \u5f85\u6267\u884c\u8282\u70b9:[{}]", new Object[]{nodeId, waitExecWorkflowNode.getId()});
                return Boolean.FALSE;
            }
            boolean isCompleted = jobTaskBatches.stream().anyMatch(jobTaskBatch -> JobTaskBatchStatusEnum.NOT_COMPLETE.contains(jobTaskBatch.getTaskBatchStatus()));
            if (isCompleted) {
                SnailJobLog.LOCAL.info("\u5b58\u5728\u672a\u5b8c\u6210\u7684\u5144\u5f1f\u8282\u70b9. [{}] \u5f85\u6267\u884c\u8282\u70b9:[{}] parentId:[{}]", new Object[]{nodeId, taskExecute.getParentId(), waitExecWorkflowNode.getId()});
                return Boolean.FALSE;
            }
            if (!jobTaskBatches.stream().anyMatch(jobTaskBatch -> jobTaskBatch.getTaskBatchStatus().intValue() != JobTaskBatchStatusEnum.SUCCESS.getStatus() && !JobOperationReasonEnum.WORKFLOW_SUCCESSOR_SKIP_EXECUTION.contains(jobTaskBatch.getOperationReason())) || !Objects.equals((preWorkflowNode = workflowNodeMap.get(nodeId)).getFailStrategy(), FailStrategyEnum.BLOCK.getCode())) continue;
            SnailJobLog.LOCAL.info("\u6b64\u8282\u70b9\u6267\u884c\u5931\u8d25\u4e14\u5931\u8d25\u7b56\u7565\u914d\u7f6e\u4e86\u3010\u963b\u585e\u3011\u4e2d\u65ad\u6267\u884c [{}] \u5f85\u6267\u884c\u8282\u70b9:[{}] parentId:[{}]", new Object[]{nodeId, taskExecute.getParentId(), waitExecWorkflowNode.getId()});
            return Boolean.FALSE;
        }
        return Boolean.TRUE;
    }

    private void handlerTaskBatch(WorkflowNodeTaskExecuteDTO taskExecute, int taskStatus, int operationReason) {
        WorkflowTaskBatch jobTaskBatch = new WorkflowTaskBatch();
        jobTaskBatch.setId(taskExecute.getWorkflowTaskBatchId());
        jobTaskBatch.setExecutionAt(Long.valueOf(DateUtils.toNowMilli()));
        jobTaskBatch.setTaskBatchStatus(Integer.valueOf(taskStatus));
        jobTaskBatch.setOperationReason(Integer.valueOf(operationReason));
        jobTaskBatch.setUpdateDt(LocalDateTime.now());
        Assert.isTrue((1 == this.workflowTaskBatchMapper.updateById((Object)jobTaskBatch) ? 1 : 0) != 0, () -> new SnailJobServerException("\u66f4\u65b0\u4efb\u52a1\u5931\u8d25"));
    }

    @Generated
    public WorkflowExecutorActor(WorkflowTaskBatchMapper workflowTaskBatchMapper, WorkflowNodeMapper workflowNodeMapper, WorkflowMapper workflowMapper, JobMapper jobMapper, JobTaskBatchMapper jobTaskBatchMapper, WorkflowBatchHandler workflowBatchHandler) {
        this.workflowTaskBatchMapper = workflowTaskBatchMapper;
        this.workflowNodeMapper = workflowNodeMapper;
        this.workflowMapper = workflowMapper;
        this.jobMapper = jobMapper;
        this.jobTaskBatchMapper = jobTaskBatchMapper;
        this.workflowBatchHandler = workflowBatchHandler;
    }
}

