feat(device): 优化设备报警处理逻辑与数据存储

- 调整报警确认阈值从3次到15次
- 增加最小报警持续时间至60秒
- 改进Redis数据存储结构,按设备ID单独存储避免覆盖
- 实现设备ID历史列表的增量更新维护
- 完善报警确认与恢复的时间判断机制
- 优化报警日志与告警信息表的处理流程
dev
wangyan 2025-11-25 14:29:39 +08:00
parent 9658b6dc50
commit 391164a205
1 changed files with 82 additions and 51 deletions

View File

@ -28,36 +28,36 @@ public class AppPosiDeviceController {
@Resource
private MesDeviceMonitoringMapper mesDeviceMonitoringMapper;
@Resource
private DeviceMonitoringAlarmSrevice deviceMonitoringAlarmSrevice;
@Resource
private RedisUtil redisUtil;
@Resource
private TbIronWarnInfoDao tbIronWarnInfoDao;
// 用于跟踪设备节点的报警状态
private final Map<String, PageData> alarmStatusMap = new ConcurrentHashMap<>();
// 用于报警延时确认机制的计数器
private final Map<String, Integer> alarmConfirmCount = new ConcurrentHashMap<>();
// 用于跟踪报警开始时间的映射
private final Map<String, Long> alarmStartTimeMap = new ConcurrentHashMap<>();
// 用于跟踪报警恢复开始时间的映射
private final Map<String, Long> alarmRecoverStartTimeMap = new ConcurrentHashMap<>();
// 用于存储当前报警信息的映射(避免存入过期的报警信息)
private final Map<String, Map<String, Object>> currentAlarmInfo = new ConcurrentHashMap<>();
// 报警确认次数阈值
private static final int ALARM_CONFIRM_THRESHOLD = 3;
private static final int ALARM_CONFIRM_THRESHOLD = 15;
// 最小报警持续时间毫秒默认10秒
private static final long MIN_ALARM_DURATION = 10 * 1000;
private static final long MIN_ALARM_DURATION = 60 * 1000;
@PostMapping("/test1")
public R test1(@RequestBody HashMap<String, String> parma) throws Exception {
@ -228,10 +228,41 @@ public class AppPosiDeviceController {
// 存储集合中处理好的数据
if (!dataList.isEmpty()) {
mesDeviceMonitoringMapper.saveBatchFromMes(dataList);
// 将物联网数据存入Redis缓存
redisUtil.set("WMK_DATA_LIST", JSONObject.toJSONString(dataList));
XxlJobHelper.log("成功保存{}条物联网模块数据到本地数据库", dataList.size());
// 将物联网数据存入Redis缓存以设备ID为键存储避免覆盖和重复
for (Map<String, Object> data : dataList) {
String equipmentId = (String) data.get("EQUIPMENT_ID");
if (equipmentId != null && !equipmentId.isEmpty()) {
// 使用设备ID作为键存储单个设备数据
String key = "WMK_DEVICE:" + equipmentId;
redisUtil.set(key, JSONObject.toJSONString(data), 3600); // 1小时过期
}
}
// 维护一个包含所有历史设备ID的集合增量更新
// 首先获取现有的设备ID列表
Set<String> allEquipmentIds = new HashSet<>();
String existingEquipmentIdsJson = (String) redisUtil.get("WMK_ALL_DEVICES");
if (existingEquipmentIdsJson != null && !existingEquipmentIdsJson.isEmpty()) {
try {
List<String> existingEquipmentIds = JSONArray.parseArray(existingEquipmentIdsJson, String.class);
allEquipmentIds.addAll(existingEquipmentIds);
} catch (Exception e) {
// 解析失败则忽略现有数据
XxlJobHelper.log("解析现有设备ID列表失败: {}", e.getMessage());
}
}
// 添加当前批次的新设备ID
List<String> currentEquipmentIds = dataList.stream()
.map(data -> (String) data.get("EQUIPMENT_ID"))
.filter(Objects::nonNull)
.collect(Collectors.toList());
allEquipmentIds.addAll(currentEquipmentIds);
// 存储更新后的设备ID列表
redisUtil.set("WMK_ALL_DEVICES", JSONObject.toJSONString(new ArrayList<>(allEquipmentIds)), 3600);
XxlJobHelper.log("成功保存{}条物联网模块数据到本地数据库", dataList.size());
// 处理报警数据
processAlarmData(dataList);
}
@ -302,7 +333,7 @@ public class AppPosiDeviceController {
}
return data;
}
/**
*
* @param data
@ -323,7 +354,7 @@ public class AppPosiDeviceController {
double currentValue = Double.parseDouble(currentValueStr);
String equipmentKey = equipmentId + "_" + plcId;
boolean isAlarm = false;
String alarmType = "";
@ -364,24 +395,24 @@ public class AppPosiDeviceController {
}
long currentTimestamp = System.currentTimeMillis();
// 处理报警确认逻辑
if (isAlarm) {
// 如果超过阈值,增加计数
int currentCount = alarmConfirmCount.getOrDefault(equipmentKey, 0) + 1;
alarmConfirmCount.put(equipmentKey, currentCount);
// 记录首次报警时间
if (!alarmStartTimeMap.containsKey(equipmentKey)) {
alarmStartTimeMap.put(equipmentKey, currentTimestamp);
}
// 清除恢复计时(如果有的话)
alarmRecoverStartTimeMap.remove(equipmentKey);
// 保存当前报警信息,确保后续处理的是最新的报警数据
currentAlarmInfo.put(equipmentKey, new HashMap<>(data));
// 检查是否满足最小持续时间要求
Long startTime = alarmStartTimeMap.get(equipmentKey);
if (startTime != null) {
@ -391,7 +422,7 @@ public class AppPosiDeviceController {
if (currentCount >= ALARM_CONFIRM_THRESHOLD) {
data.put("WARNING", 1);
data.put("OVERVIEW_OF_ALERTS", alarmType);
XxlJobHelper.log("设备 {} 触发报警: {} (值: {},持续时间: {}ms)",
XxlJobHelper.log("设备 {} 触发报警: {} (值: {},持续时间: {}ms)",
equipmentKey, alarmType, currentValue, duration);
}
}
@ -409,10 +440,10 @@ public class AppPosiDeviceController {
alarmConfirmCount.remove(equipmentKey);
alarmStartTimeMap.remove(equipmentKey);
}
// 清除当前报警信息
currentAlarmInfo.remove(equipmentKey);
// 确保WARNING字段为0表示未报警
data.put("WARNING", 0);
data.put("OVERVIEW_OF_ALERTS", "");
@ -430,11 +461,11 @@ public class AppPosiDeviceController {
private void processAlarmData(List<Map<String, Object>> mesSaveList) {
ArrayList<PageData> alarmLogArrayList = new ArrayList<>();
ArrayList<PageData> activeAlarms = new ArrayList<>();
for (Map<String, Object> item : mesSaveList) {
// 检查是否有报警
Object warningObj = item.get("WARNING");
if (warningObj != null && "1".equals(String.valueOf(warningObj))) {
// 只有当WARNING为1时才创建报警记录这表示已经通过了延时确认机制
PageData itemPageData = new PageData();
@ -457,28 +488,28 @@ public class AppPosiDeviceController {
itemPageData.put("CREATTIME", DateUtil.date2Str(new Date()));
itemPageData.put("REPORT_ID", item.get("REPORT_ID"));
alarmLogArrayList.add(itemPageData);
// 添加到活跃报警列表用于处理warninfo表
activeAlarms.add(itemPageData);
}
}
// 把报警的数据转存到数据库里
if (!alarmLogArrayList.isEmpty()) {
deviceMonitoringAlarmSrevice.saveBatch(alarmLogArrayList);
XxlJobHelper.log("成功保存{}条报警数据到报警表", alarmLogArrayList.size());
}
// 将报警数据存入Redis
if (!activeAlarms.isEmpty()) {
redisUtil.set("WMK_ALARM_DATA_LIST", JSONObject.toJSONString(activeAlarms));
XxlJobHelper.log("成功将{}条报警数据存入Redis", activeAlarms.size());
}
// 处理告警信息表(tb_iron_warninfo)
processWarnInfo(activeAlarms);
}
/**
* (tb_iron_warninfo)
* @param activeAlarms
@ -487,14 +518,14 @@ public class AppPosiDeviceController {
// 获取当前时间
String currentTime = DateUtil.date2Str(new Date());
long currentTimestamp = System.currentTimeMillis();
// 用于存储消警数据的列表
ArrayList<PageData> recoverAlarms = new ArrayList<>();
// 处理新产生的报警
for (PageData alarm : activeAlarms) {
String key = alarm.getString("EQUIPMENT_ID") + "_" + alarm.getString("PLC_ID") + "_" + alarm.getString("PLC_NAME");
// 检查该监测点是否已有报警记录
if (!alarmStatusMap.containsKey(key)) {
// 新报警,创建告警信息
@ -524,14 +555,14 @@ public class AppPosiDeviceController {
// 保存到数据库
tbIronWarnInfoDao.saveWarnInfo(warnInfo);
// 保存到alarmStatusMap中用于跟踪状态
alarmStatusMap.put(key, warnInfo);
// 清除恢复计时(如果有的话)
alarmRecoverStartTimeMap.remove(key);
XxlJobHelper.log("创建新的告警信息: 设备ID={}, PLC_ID={}, 报警内容={}",
XxlJobHelper.log("创建新的告警信息: 设备ID={}, PLC_ID={}, 报警内容={}",
alarm.get("EQUIPMENT_ID"), alarm.get("PLC_ID"), alarm.get("OVERVIEW_OF_ALERTS"));
} else {
// 已存在报警,更新当前值和时间
@ -539,26 +570,26 @@ public class AppPosiDeviceController {
existingWarn.put("MESSAGE", "设备名称:"+ alarm.get("DEVICE_ID")+",监测节点:"+ alarm.get("PLC_NAME")+",报警内容:"+ alarm.get("OVERVIEW_OF_ALERTS"));
existingWarn.put("OPERATTIME", currentTime);
alarmStatusMap.put(key, existingWarn);
// 清除恢复计时(如果有的话)
alarmRecoverStartTimeMap.remove(key);
}
}
// 检查是否有报警恢复的节点
Set<String> activeAlarmKeys = new HashSet<>();
for (PageData alarm : activeAlarms) {
String key = alarm.getString("EQUIPMENT_ID") + "_" + alarm.getString("PLC_ID") + "_" + alarm.getString("PLC_NAME");
activeAlarmKeys.add(key);
}
// 检查之前报警但现在不报警的节点
Iterator<Map.Entry<String, PageData>> iterator = alarmStatusMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, PageData> entry = iterator.next();
String key = entry.getKey();
PageData warnInfo = entry.getValue();
if (!activeAlarmKeys.contains(key)) {
// 检查是否满足最小恢复持续时间要求
Long recoverStartTime = alarmRecoverStartTimeMap.get(key);
@ -566,7 +597,7 @@ public class AppPosiDeviceController {
long recoverDuration = currentTimestamp - recoverStartTime;
if (recoverDuration < MIN_ALARM_DURATION) {
// 未达到最小恢复持续时间,暂不处理消警
XxlJobHelper.log("设备 {} 恢复未达到最小持续时间 {}ms当前持续时间 {}ms暂不消警",
XxlJobHelper.log("设备 {} 恢复未达到最小持续时间 {}ms当前持续时间 {}ms暂不消警",
key, MIN_ALARM_DURATION, recoverDuration);
continue;
}
@ -576,24 +607,24 @@ public class AppPosiDeviceController {
XxlJobHelper.log("设备 {} 开始恢复计时", key);
continue;
}
// 该节点之前报警,现在已经满足恢复条件,需要更新状态为已消警
warnInfo.put("WARN_STATUS", "1"); // 1表示已消警
warnInfo.put("END_TIME", currentTime); // 预警结束时间
warnInfo.put("OPERATTIME", currentTime);
// 更新数据库中的记录
tbIronWarnInfoDao.updateWarnInfo(warnInfo);
// 添加到消警列表中用于存储到Redis
recoverAlarms.add(warnInfo);
XxlJobHelper.log("更新告警信息为已消警: 设备ID={}, PLC_ID={}, 报警内容={}",
XxlJobHelper.log("更新告警信息为已消警: 设备ID={}, PLC_ID={}, 报警内容={}",
warnInfo.get("EQUIPMENT_ID"), warnInfo.get("PLC_ID"), warnInfo.get("OVERVIEW_OF_ALERTS"));
// 从跟踪map中移除
iterator.remove();
// 同时清除报警确认计数器、开始时间记录和恢复开始时间记录
alarmConfirmCount.remove(key);
alarmStartTimeMap.remove(key);
@ -601,7 +632,7 @@ public class AppPosiDeviceController {
currentAlarmInfo.remove(key);
}
}
// 将消警数据存入Redis
if (!recoverAlarms.isEmpty()) {
// 获取现有的消警数据
@ -640,4 +671,4 @@ public class AppPosiDeviceController {
// "display area": "1#高炉中控室", -》安装地点
// "host id": "T001-T002" -》
// }
}
}