springboot(40):数据断流告警
- 人工智能
- 2025-08-19 10:39:01

检测逻辑 package com.alibaba.gts.flm.push.data.client.service; import com.alibaba.fastjson.JSONObject; import com.alibaba.gts.flm.push.data.client mon.util.DateUtil; import com.alibaba.gts.flm.push.data.client.service.model.FcoWarningKeyDTO; import com.google mon.util.concurrent.RateLimiter; import lombok.extern.slf4j.Slf4j; import org.apache mons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Service @Slf4j public class FlowCutOffWarningService { @Autowired private RestTemplateService restTemplateService; @Value("${sendDingDingUrl:-}") private String sendDingDingApiUrl; @Value("${fcowTaskLimit:5}") private Integer fcowTaskLimit; private ScheduledExecutorService executorService; private Map<String, Long> keyTime = new ConcurrentHashMap<>(); private Map<String, JSONObject> taskStatus = new ConcurrentHashMap<>(); // uuid // 每秒许可数 = 12次 / 60秒 private Map<String, RateLimiter> taskRateLimiters = new ConcurrentHashMap<>(); private AtomicInteger threadCount; // 启动定时任务 // limitVal : 每秒许可数 = 12次 / 60秒 public void startTask(List<FcoWarningKeyDTO> fcoWarningKeyDTOS, Long second, Double limitVal) { if (fcoWarningKeyDTOS == null || fcoWarningKeyDTOS.size() == 0 || second == null || limitVal == 0) { throw new RuntimeException("参数为空"); } if (executorService == null) { executorService = new ScheduledThreadPoolExecutor(fcowTaskLimit); threadCount = new AtomicInteger(0); } if (threadCount.get() >= fcowTaskLimit) { throw new RuntimeException("断流检测任务上限5个,需要启动更多任务请修改配置[fcowTaskLimit]的值"); } String uuid = UUID.randomUUID().toString(); taskRateLimiters.put(uuid, RateLimiter.create(limitVal)); log.info("启动[{}秒]断流监测任务...uuid:{}", second, uuid); executorService.scheduleWithFixedDelay(() -> { try { //log.info("执行[{}秒]断流监测任务...", second); JSONObject status = new JSONObject(); status.put("fcoWarningKeyDTOS", fcoWarningKeyDTOS); status.put("second", second); status.put("lastRunTime", DateUtil.now()); taskStatus.put(uuid, status); StringBuilder sb = compare(fcoWarningKeyDTOS, second, keyTime); if (sb == null || sb.length() == 0) { return; } if (!taskRateLimiters.get(uuid).tryAcquire()) { log.warn("发送钉钉过于频繁,本次忽略,uuid:{},limitVal:{}", uuid, limitVal); return; } sendDingDing(sb); } catch (Exception e) { log.error("断流监测任务执行出错,fcoWarningKeyDTOS:{},second:{}秒,error:{}", JSONObject.toJSONString(fcoWarningKeyDTOS), second, ExceptionUtils.getStackTrace(e)); } }, second, second, TimeUnit.SECONDS); log.info("[{}秒]断流监测任务启动成功,uuid:{}", second, uuid); threadCount.set(threadCount.get() + 1); } public Map<String, JSONObject> getTaskStatus() { return taskStatus; } public Map<String, Long> getKeyTime() { return keyTime; } public Map<String, String> getKeyTimeFormat() { Map<String, String> map = new HashMap<>(); keyTime.forEach((k, v) -> { map.put(k, DateUtil.format(v)); }); return map; } public void flow(String k) { keyTime.put(k, System.currentTimeMillis()); } public StringBuilder compare(List<FcoWarningKeyDTO> fcoWarningKeyDTOS, Long second, Map<String, Long> keyTime) { StringBuilder sb = null; for (FcoWarningKeyDTO fcoWarningKeyDTO : fcoWarningKeyDTOS) { if (keyTime.containsKey(fcoWarningKeyDTO.getCode()) && (System.currentTimeMillis() - keyTime.get(fcoWarningKeyDTO.getCode())) / 1000 > second) { String lastTime = keyTime.containsKey(fcoWarningKeyDTO.getCode()) ? DateUtil.format(keyTime.get(fcoWarningKeyDTO.getCode())) : "-"; sb = append(sb, fcoWarningKeyDTO, lastTime, second); } } return sb; } private StringBuilder append(StringBuilder sb, FcoWarningKeyDTO fcoWarningKeyDTO, String lastTime, Long second) { if (sb == null) { sb = new StringBuilder(); } sb.append("#### " + "[" + fcoWarningKeyDTO.getName() + "]数据断流").append("\n\n"); sb.append(" > 时间: ").append(DateUtil.now()).append("\n\n"); sb.append(" > 描述: ").append(second).append("秒内无数据").append("\n\n"); sb.append(" > 上次数据时间: " + lastTime).append("\n\n"); return sb; } private void sendDingDing(StringBuilder sb) { JSONObject req = new JSONObject(); JSONObject at = new JSONObject(); at.put("isAtAll", "false"); req.put("title", "数据断流警告"); req.put("text", sb); req.put("at", at); restTemplateService.post(sendDingDingApiUrl, req); } } 使用 @Autowired private FlowCutOffWarningService flowCutOffWarningService; List<FcoWarningKeyDTO> fcoWarningKeyDTOS3Second = new LinkedList<FcoWarningKeyDTO>() {{ add(new FcoWarningKeyDTO("test", "测试")); }}; flowCutOffWarningService.startTask(fcoWarningKeyDTOS3Second, 3L, 0.003); RestTemplateService
springboot(39) : RestTemplate完全体_Lxinccode的博客-CSDN博客
springboot(40):数据断流告警由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“springboot(40):数据断流告警”