diff --git a/src/main/java/com/zcloud/controller/AppPosiDeviceController.java b/src/main/java/com/zcloud/controller/AppPosiDeviceController.java index e2ecb15..9e2a79c 100644 --- a/src/main/java/com/zcloud/controller/AppPosiDeviceController.java +++ b/src/main/java/com/zcloud/controller/AppPosiDeviceController.java @@ -28,36 +28,36 @@ public class AppPosiDeviceController { @Resource private MesDeviceMonitoringMapper mesDeviceMonitoringMapper; - + @Resource private DeviceMonitoringAlarmSrevice deviceMonitoringAlarmSrevice; @Resource private RedisUtil redisUtil; - + @Resource private TbIronWarnInfoDao tbIronWarnInfoDao; // 用于跟踪设备节点的报警状态 private final Map alarmStatusMap = new ConcurrentHashMap<>(); - + // 用于报警延时确认机制的计数器 private final Map alarmConfirmCount = new ConcurrentHashMap<>(); - + // 用于跟踪报警开始时间的映射 private final Map alarmStartTimeMap = new ConcurrentHashMap<>(); - + // 用于跟踪报警恢复开始时间的映射 private final Map alarmRecoverStartTimeMap = new ConcurrentHashMap<>(); - + // 用于存储当前报警信息的映射(避免存入过期的报警信息) private final Map> currentAlarmInfo = new ConcurrentHashMap<>(); - + // 报警确认次数阈值 - private static final int ALARM_CONFIRM_THRESHOLD = 3; - + private static final int ALARM_CONFIRM_THRESHOLD = 15; + // 最小报警持续时间(毫秒),默认10秒 - private static final long MIN_ALARM_DURATION = 10 * 1000; + private static final long MIN_ALARM_DURATION = 60 * 1000; @PostMapping("/test1") public R test1(@RequestBody HashMap parma) throws Exception { @@ -228,10 +228,41 @@ public class AppPosiDeviceController { // 存储集合中处理好的数据 if (!dataList.isEmpty()) { mesDeviceMonitoringMapper.saveBatchFromMes(dataList); - // 将物联网数据存入Redis缓存 - redisUtil.set("WMK_DATA_LIST", JSONObject.toJSONString(dataList)); - XxlJobHelper.log("成功保存{}条物联网模块数据到本地数据库", dataList.size()); + // 将物联网数据存入Redis缓存,以设备ID为键存储,避免覆盖和重复 + for (Map data : dataList) { + String equipmentId = (String) data.get("EQUIPMENT_ID"); + if (equipmentId != null && !equipmentId.isEmpty()) { + // 使用设备ID作为键存储单个设备数据 + String key = "WMK_DEVICE:" + equipmentId; + redisUtil.set(key, JSONObject.toJSONString(data), 3600); // 1小时过期 + } + } + // 维护一个包含所有历史设备ID的集合(增量更新) + // 首先获取现有的设备ID列表 + Set allEquipmentIds = new HashSet<>(); + String existingEquipmentIdsJson = (String) redisUtil.get("WMK_ALL_DEVICES"); + if (existingEquipmentIdsJson != null && !existingEquipmentIdsJson.isEmpty()) { + try { + List existingEquipmentIds = JSONArray.parseArray(existingEquipmentIdsJson, String.class); + allEquipmentIds.addAll(existingEquipmentIds); + } catch (Exception e) { + // 解析失败则忽略现有数据 + XxlJobHelper.log("解析现有设备ID列表失败: {}", e.getMessage()); + } + } + + // 添加当前批次的新设备ID + List currentEquipmentIds = dataList.stream() + .map(data -> (String) data.get("EQUIPMENT_ID")) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + allEquipmentIds.addAll(currentEquipmentIds); + + // 存储更新后的设备ID列表 + redisUtil.set("WMK_ALL_DEVICES", JSONObject.toJSONString(new ArrayList<>(allEquipmentIds)), 3600); + XxlJobHelper.log("成功保存{}条物联网模块数据到本地数据库", dataList.size()); + // 处理报警数据 processAlarmData(dataList); } @@ -302,7 +333,7 @@ public class AppPosiDeviceController { } return data; } - + /** * 检查阈值是否超出范围(带延时确认机制) * @param data 数据项 @@ -323,7 +354,7 @@ public class AppPosiDeviceController { double currentValue = Double.parseDouble(currentValueStr); String equipmentKey = equipmentId + "_" + plcId; - + boolean isAlarm = false; String alarmType = ""; @@ -364,24 +395,24 @@ public class AppPosiDeviceController { } long currentTimestamp = System.currentTimeMillis(); - + // 处理报警确认逻辑 if (isAlarm) { // 如果超过阈值,增加计数 int currentCount = alarmConfirmCount.getOrDefault(equipmentKey, 0) + 1; alarmConfirmCount.put(equipmentKey, currentCount); - + // 记录首次报警时间 if (!alarmStartTimeMap.containsKey(equipmentKey)) { alarmStartTimeMap.put(equipmentKey, currentTimestamp); } - + // 清除恢复计时(如果有的话) alarmRecoverStartTimeMap.remove(equipmentKey); - + // 保存当前报警信息,确保后续处理的是最新的报警数据 currentAlarmInfo.put(equipmentKey, new HashMap<>(data)); - + // 检查是否满足最小持续时间要求 Long startTime = alarmStartTimeMap.get(equipmentKey); if (startTime != null) { @@ -391,7 +422,7 @@ public class AppPosiDeviceController { if (currentCount >= ALARM_CONFIRM_THRESHOLD) { data.put("WARNING", 1); data.put("OVERVIEW_OF_ALERTS", alarmType); - XxlJobHelper.log("设备 {} 触发报警: {} (值: {},持续时间: {}ms)", + XxlJobHelper.log("设备 {} 触发报警: {} (值: {},持续时间: {}ms)", equipmentKey, alarmType, currentValue, duration); } } @@ -409,10 +440,10 @@ public class AppPosiDeviceController { alarmConfirmCount.remove(equipmentKey); alarmStartTimeMap.remove(equipmentKey); } - + // 清除当前报警信息 currentAlarmInfo.remove(equipmentKey); - + // 确保WARNING字段为0,表示未报警 data.put("WARNING", 0); data.put("OVERVIEW_OF_ALERTS", ""); @@ -430,11 +461,11 @@ public class AppPosiDeviceController { private void processAlarmData(List> mesSaveList) { ArrayList alarmLogArrayList = new ArrayList<>(); ArrayList activeAlarms = new ArrayList<>(); - + for (Map item : mesSaveList) { // 检查是否有报警 Object warningObj = item.get("WARNING"); - + if (warningObj != null && "1".equals(String.valueOf(warningObj))) { // 只有当WARNING为1时才创建报警记录,这表示已经通过了延时确认机制 PageData itemPageData = new PageData(); @@ -457,28 +488,28 @@ public class AppPosiDeviceController { itemPageData.put("CREATTIME", DateUtil.date2Str(new Date())); itemPageData.put("REPORT_ID", item.get("REPORT_ID")); alarmLogArrayList.add(itemPageData); - + // 添加到活跃报警列表,用于处理warninfo表 activeAlarms.add(itemPageData); } } - + // 把报警的数据转存到数据库里 if (!alarmLogArrayList.isEmpty()) { deviceMonitoringAlarmSrevice.saveBatch(alarmLogArrayList); XxlJobHelper.log("成功保存{}条报警数据到报警表", alarmLogArrayList.size()); } - + // 将报警数据存入Redis if (!activeAlarms.isEmpty()) { redisUtil.set("WMK_ALARM_DATA_LIST", JSONObject.toJSONString(activeAlarms)); XxlJobHelper.log("成功将{}条报警数据存入Redis", activeAlarms.size()); } - + // 处理告警信息表(tb_iron_warninfo) processWarnInfo(activeAlarms); } - + /** * 处理告警信息表(tb_iron_warninfo) * @param activeAlarms 当前活跃的报警 @@ -487,14 +518,14 @@ public class AppPosiDeviceController { // 获取当前时间 String currentTime = DateUtil.date2Str(new Date()); long currentTimestamp = System.currentTimeMillis(); - + // 用于存储消警数据的列表 ArrayList recoverAlarms = new ArrayList<>(); - + // 处理新产生的报警 for (PageData alarm : activeAlarms) { String key = alarm.getString("EQUIPMENT_ID") + "_" + alarm.getString("PLC_ID") + "_" + alarm.getString("PLC_NAME"); - + // 检查该监测点是否已有报警记录 if (!alarmStatusMap.containsKey(key)) { // 新报警,创建告警信息 @@ -524,14 +555,14 @@ public class AppPosiDeviceController { // 保存到数据库 tbIronWarnInfoDao.saveWarnInfo(warnInfo); - + // 保存到alarmStatusMap中,用于跟踪状态 alarmStatusMap.put(key, warnInfo); - + // 清除恢复计时(如果有的话) alarmRecoverStartTimeMap.remove(key); - - XxlJobHelper.log("创建新的告警信息: 设备ID={}, PLC_ID={}, 报警内容={}", + + XxlJobHelper.log("创建新的告警信息: 设备ID={}, PLC_ID={}, 报警内容={}", alarm.get("EQUIPMENT_ID"), alarm.get("PLC_ID"), alarm.get("OVERVIEW_OF_ALERTS")); } else { // 已存在报警,更新当前值和时间 @@ -539,26 +570,26 @@ public class AppPosiDeviceController { existingWarn.put("MESSAGE", "设备名称:"+ alarm.get("DEVICE_ID")+",监测节点:"+ alarm.get("PLC_NAME")+",报警内容:"+ alarm.get("OVERVIEW_OF_ALERTS")); existingWarn.put("OPERATTIME", currentTime); alarmStatusMap.put(key, existingWarn); - + // 清除恢复计时(如果有的话) alarmRecoverStartTimeMap.remove(key); } } - + // 检查是否有报警恢复的节点 Set activeAlarmKeys = new HashSet<>(); for (PageData alarm : activeAlarms) { String key = alarm.getString("EQUIPMENT_ID") + "_" + alarm.getString("PLC_ID") + "_" + alarm.getString("PLC_NAME"); activeAlarmKeys.add(key); } - + // 检查之前报警但现在不报警的节点 Iterator> iterator = alarmStatusMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); String key = entry.getKey(); PageData warnInfo = entry.getValue(); - + if (!activeAlarmKeys.contains(key)) { // 检查是否满足最小恢复持续时间要求 Long recoverStartTime = alarmRecoverStartTimeMap.get(key); @@ -566,7 +597,7 @@ public class AppPosiDeviceController { long recoverDuration = currentTimestamp - recoverStartTime; if (recoverDuration < MIN_ALARM_DURATION) { // 未达到最小恢复持续时间,暂不处理消警 - XxlJobHelper.log("设备 {} 恢复未达到最小持续时间 {}ms,当前持续时间 {}ms,暂不消警", + XxlJobHelper.log("设备 {} 恢复未达到最小持续时间 {}ms,当前持续时间 {}ms,暂不消警", key, MIN_ALARM_DURATION, recoverDuration); continue; } @@ -576,24 +607,24 @@ public class AppPosiDeviceController { XxlJobHelper.log("设备 {} 开始恢复计时", key); continue; } - + // 该节点之前报警,现在已经满足恢复条件,需要更新状态为已消警 warnInfo.put("WARN_STATUS", "1"); // 1表示已消警 warnInfo.put("END_TIME", currentTime); // 预警结束时间 warnInfo.put("OPERATTIME", currentTime); - + // 更新数据库中的记录 tbIronWarnInfoDao.updateWarnInfo(warnInfo); - + // 添加到消警列表中,用于存储到Redis recoverAlarms.add(warnInfo); - - XxlJobHelper.log("更新告警信息为已消警: 设备ID={}, PLC_ID={}, 报警内容={}", + + XxlJobHelper.log("更新告警信息为已消警: 设备ID={}, PLC_ID={}, 报警内容={}", warnInfo.get("EQUIPMENT_ID"), warnInfo.get("PLC_ID"), warnInfo.get("OVERVIEW_OF_ALERTS")); - + // 从跟踪map中移除 iterator.remove(); - + // 同时清除报警确认计数器、开始时间记录和恢复开始时间记录 alarmConfirmCount.remove(key); alarmStartTimeMap.remove(key); @@ -601,7 +632,7 @@ public class AppPosiDeviceController { currentAlarmInfo.remove(key); } } - + // 将消警数据存入Redis if (!recoverAlarms.isEmpty()) { // 获取现有的消警数据 @@ -640,4 +671,4 @@ public class AppPosiDeviceController { // "display area": "1#高炉中控室", -》安装地点 // "host id": "T001-T002" -》 // } -} \ No newline at end of file +}