/*
 * Decompiled with CFR 0.152.
 */
package com.aizuda.snailjob.server.common.alarm;

import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.alarm.Alarm;
import com.aizuda.snailjob.common.core.alarm.AlarmContext;
import com.aizuda.snailjob.common.core.alarm.SnailJobAlarmFactory;
import com.aizuda.snailjob.common.core.enums.StatusEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.util.StreamUtils;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.AlarmInfoConverter;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.cache.CacheNotifyRateLimiter;
import com.aizuda.snailjob.server.common.dto.AlarmInfo;
import com.aizuda.snailjob.server.common.dto.NotifyConfigInfo;
import com.aizuda.snailjob.server.common.enums.SyetemTaskTypeEnum;
import com.aizuda.snailjob.template.datasource.access.AccessTemplate;
import com.aizuda.snailjob.template.datasource.persistence.mapper.NotifyRecipientMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyConfig;
import com.aizuda.snailjob.template.datasource.persistence.po.NotifyRecipient;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEvent;
import org.springframework.scheduling.TaskScheduler;

public abstract class AbstractAlarm<E extends ApplicationEvent, A extends AlarmInfo>
implements Runnable,
Lifecycle {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractAlarm.class);
    @Autowired
    @Qualifier(value="alarmExecutorService")
    protected TaskScheduler taskScheduler;
    @Autowired
    protected AccessTemplate accessTemplate;
    @Autowired
    protected NotifyRecipientMapper recipientMapper;

    @Override
    public void run() {
        try {
            List<A> alarmInfos = this.poll();
            if (CollUtil.isEmpty(alarmInfos)) {
                return;
            }
            HashSet<Integer> notifyScene = new HashSet<Integer>();
            Map<Long, List<A>> waitSendAlarmInfos = this.convertAlarmDTO(alarmInfos, notifyScene);
            Set<Long> notifyIds = waitSendAlarmInfos.keySet();
            Map<Long, NotifyConfigInfo> notifyConfigMap = this.obtainNotifyConfig(notifyScene, notifyIds);
            waitSendAlarmInfos.forEach((key, list) -> Optional.ofNullable((NotifyConfigInfo)notifyConfigMap.get(key)).ifPresent(notifyConfig -> {
                for (AlarmInfo alarmDTO : list) {
                    this.sendAlarm((NotifyConfigInfo)notifyConfig, (A)alarmDTO);
                }
            }));
        }
        catch (InterruptedException e) {
            SnailJobLog.LOCAL.info("retry task fail dead letter alarm stop", new Object[0]);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            SnailJobLog.LOCAL.error("RetryTaskFailDeadLetterAlarmListener queue poll Exception", new Object[]{e});
        }
    }

    protected Map<Long, NotifyConfigInfo> obtainNotifyConfig(Set<Integer> notifyScene, Set<Long> notifyIds) {
        if (CollUtil.isEmpty(notifyIds) || CollUtil.isEmpty(notifyScene)) {
            return Maps.newHashMap();
        }
        List notifyConfigs = this.accessTemplate.getNotifyConfigAccess().list((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)((LambdaQueryWrapper)new LambdaQueryWrapper().eq(NotifyConfig::getNotifyStatus, (Object)StatusEnum.YES.getStatus())).in(NotifyConfig::getNotifyScene, notifyScene)).in(NotifyConfig::getSystemTaskType, (Collection)StreamUtils.toList(this.getSystemTaskType(), SyetemTaskTypeEnum::getType))).in(NotifyConfig::getId, notifyIds));
        if (CollUtil.isEmpty((Collection)notifyConfigs)) {
            return Maps.newHashMap();
        }
        Set recipientIds = notifyConfigs.stream().flatMap(config -> JsonUtil.parseList((String)config.getRecipientIds(), Long.class).stream()).collect(Collectors.toSet());
        List notifyRecipients = this.recipientMapper.selectByIds(recipientIds);
        Map recipientMap = StreamUtils.toIdentityMap((Collection)notifyRecipients, NotifyRecipient::getId);
        if (CollUtil.isEmpty(recipientIds)) {
            return Maps.newHashMap();
        }
        List<NotifyConfigInfo> notifyConfigInfos = AlarmInfoConverter.INSTANCE.retryToNotifyConfigInfos(notifyConfigs);
        for (NotifyConfigInfo notifyConfigInfo : notifyConfigInfos) {
            List recipients = StreamUtils.toList(notifyConfigInfo.getRecipientIds(), recipientId -> {
                NotifyRecipient notifyRecipient = (NotifyRecipient)recipientMap.get(recipientId);
                if (Objects.isNull(notifyRecipient)) {
                    return null;
                }
                NotifyConfigInfo.RecipientInfo recipientInfo = new NotifyConfigInfo.RecipientInfo();
                recipientInfo.setNotifyAttribute(notifyRecipient.getNotifyAttribute());
                recipientInfo.setNotifyType(notifyRecipient.getNotifyType());
                return recipientInfo;
            });
            notifyConfigInfo.setRecipientInfos(recipients);
        }
        return StreamUtils.toIdentityMap(notifyConfigInfos, NotifyConfigInfo::getId);
    }

    protected abstract List<SyetemTaskTypeEnum> getSystemTaskType();

    protected abstract Map<Long, List<A>> convertAlarmDTO(List<A> var1, Set<Integer> var2);

    protected abstract List<A> poll() throws InterruptedException;

    protected abstract AlarmContext buildAlarmContext(A var1, NotifyConfigInfo var2);

    @Override
    public void start() {
        this.startLog();
        this.taskScheduler.scheduleAtFixedRate((Runnable)this, Duration.parse("PT1S"));
    }

    protected abstract void startLog();

    @Override
    public void close() {
    }

    protected void sendAlarm(NotifyConfigInfo notifyConfig, A alarmDTO) {
        RateLimiter rateLimiter;
        if (Objects.equals(notifyConfig.getRateLimiterStatus(), StatusEnum.YES.getStatus()) && Objects.nonNull(rateLimiter = this.getRateLimiter(String.valueOf(notifyConfig.getId()), notifyConfig.getRateLimiterThreshold().intValue())) && !rateLimiter.tryAcquire(1L, TimeUnit.SECONDS)) {
            return;
        }
        if (Objects.nonNull(((AlarmInfo)alarmDTO).getCount()) && Objects.nonNull(notifyConfig.getNotifyThreshold()) && ((AlarmInfo)alarmDTO).getCount() < notifyConfig.getNotifyThreshold()) {
            return;
        }
        for (NotifyConfigInfo.RecipientInfo recipientInfo : notifyConfig.getRecipientInfos()) {
            if (Objects.isNull(recipientInfo)) continue;
            AlarmContext context = this.buildAlarmContext(alarmDTO, notifyConfig);
            context.setNotifyAttribute(recipientInfo.getNotifyAttribute());
            Alarm alarm = SnailJobAlarmFactory.getAlarmType((Integer)recipientInfo.getNotifyType());
            alarm.asyncSendMessage((Object)context);
        }
    }

    protected RateLimiter getRateLimiter(String key, double rateLimiterThreshold) {
        RateLimiter rateLimiter = CacheNotifyRateLimiter.getRateLimiterByKey(key);
        if (Objects.isNull(rateLimiter) || rateLimiter.getRate() != rateLimiterThreshold) {
            CacheNotifyRateLimiter.put(key, RateLimiter.create((double)rateLimiterThreshold));
        }
        return rateLimiter;
    }

    protected abstract int getNotifyScene();

    protected abstract void doOnApplicationEvent(E var1);
}

