feat(data-docking): 实现报警与消警数据处理及Redis存储机制

- 移除测试报警数据代码,改为从Redis读取真实报警数据
- 新增处理活跃报警和消警数据的逻辑
- 实现报警类型识别与阈值提取功能
- 完善报警时间格式化处理- 增加消警数据收集并存储至Redis的功能
- 优化Redis数据存储结构,限制数据条目数量防止内存溢出
- 更新日志记录内容,明确区分报警与实时数据处理流程
- 提取获取报警阈值方法,增强代码复用性
dev
wangyan 2025-11-14 08:46:31 +08:00
parent 04fad5cb9e
commit c605470078
1 changed files with 161 additions and 14 deletions

View File

@ -149,22 +149,144 @@ public class ProvincialPlatformDataPushScheduled {
//定义datas指标数据集合 //定义datas指标数据集合
List<Map<String, Object>> alarmList = new ArrayList<>(); List<Map<String, Object>> alarmList = new ArrayList<>();
//--------------------------定义测试数据---------------------------------- //--------------------------定义测试数据----------------------------------
HashMap<String, Object> alarm = new HashMap<>(); // HashMap<String, Object> alarm = new HashMap<>();
alarm.put("quotaId", "GT1407810001010000001"); // 传感器传感编码IOT平台取 // alarm.put("quotaId", "GT1407810001010000001"); // 传感器传感编码IOT平台取
// alarm.put("value", value); // 指标当前采集值 //// alarm.put("value", value); // 指标当前采集值
alarm.put("alarmType", "alarmhi:alarm"); // 报警类型alarm hi:alarm 表示超上限报警alarm hh:alarm 表示超上上限报警alarm lo:alarm 表示超下限报警;alarm ll:alarm 表示超下下限报警;normal:alarm 表示消警;alarmsignal 表示变化报警,即开关量报警 // alarm.put("alarmType", "alarmhi:alarm"); // 报警类型alarm hi:alarm 表示超上限报警alarm hh:alarm 表示超上上限报警alarm lo:alarm 表示超下限报警;alarm ll:alarm 表示超下下限报警;normal:alarm 表示消警;alarmsignal 表示变化报警,即开关量报警
alarm.put("threshold", 99); // 当前报警阈值/消警阈值 // alarm.put("threshold", 99); // 当前报警阈值/消警阈值
// 获取当前时间 // // 获取当前时间
LocalDateTime alarmTime = LocalDateTime.now(); // LocalDateTime alarmTime = LocalDateTime.now();
// 格式化时间 // // 格式化时间
String alarmFormatTime = alarmTime.format(formatter); // String alarmFormatTime = alarmTime.format(formatter);
alarm.put("alarmTime", alarmFormatTime); // 报警时间 // alarm.put("alarmTime", alarmFormatTime); // 报警时间
alarmList.add(alarm); // alarmList.add(alarm);
//--------------------------测试数据结束---------------------------------- //--------------------------测试数据结束----------------------------------
//真实数据逻辑(注释部分)
//取出存储在Redis中的数据 //取出存储在Redis中的数据
//List<PageData> mesDataList = JSONArray.parseArray(redisUtil.get("MES_DATA_LIST").toString(), PageData.class); //取出存储在Redis中的报警数据
List<PageData> allAlarmList = new ArrayList<>();
if (!Tools.isEmpty(redisUtil.get("MES_ALARM_DATA_LIST"))) {
allAlarmList.addAll(JSONArray.parseArray(redisUtil.get("MES_ALARM_DATA_LIST").toString(), PageData.class));
}
if (!Tools.isEmpty(redisUtil.get("WMK_ALARM_DATA_LIST"))) {
allAlarmList.addAll(JSONArray.parseArray(redisUtil.get("WMK_ALARM_DATA_LIST").toString(), PageData.class));
}
//处理消警数据
List<PageData> recoverAlarmList = new ArrayList<>();
if (!Tools.isEmpty(redisUtil.get("MES_RECOVER_ALARM_DATA_LIST"))) {
recoverAlarmList.addAll(JSONArray.parseArray(redisUtil.get("MES_RECOVER_ALARM_DATA_LIST").toString(), PageData.class));
}
if (!Tools.isEmpty(redisUtil.get("WMK_RECOVER_ALARM_DATA_LIST"))) {
recoverAlarmList.addAll(JSONArray.parseArray(redisUtil.get("WMK_RECOVER_ALARM_DATA_LIST").toString(), PageData.class));
}
//处理活跃报警数据
for (PageData alarmData : allAlarmList) {
if (alarmData.containsKey("REPORT_ID") && !Tools.isEmpty(alarmData.getString("REPORT_ID"))) {
HashMap<String, Object> alarm = new HashMap<>();
alarm.put("quotaId", alarmData.get("REPORT_ID")); // 传感器传感编码
// 设置报警值
Object currentValue = alarmData.get("CURRENT_VALUE");
if (currentValue != null) {
if (currentValue instanceof String) {
alarm.put("value", Float.parseFloat((String) currentValue));
} else if (currentValue instanceof Number) {
alarm.put("value", ((Number) currentValue).floatValue());
} else {
alarm.put("value", currentValue);
}
} else {
alarm.put("value", null);
}
// 确定报警类型
String overviewOfAlerts = alarmData.getString("OVERVIEW_OF_ALERTS");
if ("高高报警".equals(overviewOfAlerts)) {
alarm.put("alarmType", "alarmhh:alarm"); // 超上上限报警
} else if ("高报警".equals(overviewOfAlerts)) {
alarm.put("alarmType", "alarmhi:alarm"); // 超上限报警
} else if ("低报警".equals(overviewOfAlerts)) {
alarm.put("alarmType", "alarmlo:alarm"); // 超下限报警
} else if ("低低报警".equals(overviewOfAlerts)) {
alarm.put("alarmType", "alarmll:alarm:alarm"); // 超下下限报警
} else if ("联锁投入报警".equals(overviewOfAlerts)) {
alarm.put("alarmType", "alarmsignal"); // 开关量报警
}
// 设置报警阈值
Object threshold = getAlarmThreshold(alarmData, overviewOfAlerts);
if (threshold != null) {
alarm.put("threshold", threshold);
}
// 设置报警时间
String alarmTime = alarmData.getString("GATHER_TIME");
if (Tools.isEmpty(alarmTime)) {
alarmTime = collectTime;
} else {
// 转换时间格式
try {
alarmTime = alarmTime.replaceAll("[-:\\s]", "").substring(0, 14);
} catch (Exception e) {
alarmTime = collectTime;
}
}
alarm.put("alarmTime", alarmTime);
alarmList.add(alarm);
}
}
//处理消警数据
for (PageData recoverAlarm : recoverAlarmList) {
if (recoverAlarm.containsKey("REPORT_ID") && !Tools.isEmpty(recoverAlarm.getString("REPORT_ID"))) {
HashMap<String, Object> alarm = new HashMap<>();
alarm.put("quotaId", recoverAlarm.get("REPORT_ID")); // 传感器传感编码
// 设置消警值
Object currentValue = recoverAlarm.get("CURRENT_VALUE");
if (currentValue != null) {
if (currentValue instanceof String) {
alarm.put("value", Float.parseFloat((String) currentValue));
} else if (currentValue instanceof Number) {
alarm.put("value", ((Number) currentValue).floatValue());
} else {
alarm.put("value", currentValue);
}
} else {
alarm.put("value", null);
}
// 消警类型
alarm.put("alarmType", "normal:alarm"); // 消警
// 设置消警阈值
String overviewOfAlerts = recoverAlarm.getString("OVERVIEW_OF_ALERTS");
Object threshold = getAlarmThreshold(recoverAlarm, overviewOfAlerts);
if (threshold != null) {
alarm.put("threshold", threshold);
}
// 设置消警时间
String recoverTime = recoverAlarm.getString("OPERATTIME");
if (Tools.isEmpty(recoverTime)) {
recoverTime = collectTime;
} else {
// 转换时间格式
try {
recoverTime = recoverTime.replaceAll("[-:\\s]", "").substring(0, 14);
} catch (Exception e) {
recoverTime = collectTime;
}
}
alarm.put("alarmTime", recoverTime);
alarmList.add(alarm);
}
}
alarmMap.put("alarms", alarmList); alarmMap.put("alarms", alarmList);
@ -173,7 +295,7 @@ public class ProvincialPlatformDataPushScheduled {
try { try {
encrypt = AESUtil.encrypt(JSON.toJSONString(alarmMap)); encrypt = AESUtil.encrypt(JSON.toJSONString(alarmMap));
} catch (Exception e) { } catch (Exception e) {
logger.error("实时数据AES加密出现错误", e); logger.error("报警数据AES加密出现错误", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
baseMap.put("data", encrypt); baseMap.put("data", encrypt);
@ -182,6 +304,31 @@ public class ProvincialPlatformDataPushScheduled {
sendDataToPlatform(baseMap); sendDataToPlatform(baseMap);
} }
/**
*
* @param alarmData
* @param alarmType
* @return
*/
private Object getAlarmThreshold(PageData alarmData, String alarmType) {
try {
if ("高高报警".equals(alarmType)) {
return Float.parseFloat(alarmData.getString("THRESHOLD_UP_UP_LIMIT"));
} else if ("高报警".equals(alarmType)) {
return Float.parseFloat(alarmData.getString("THRESHOLD_UP_LIMIT"));
} else if ("低报警".equals(alarmType)) {
return Float.parseFloat(alarmData.getString("THRESHOLD_DOWN_LIMIT"));
} else if ("低低报警".equals(alarmType)) {
return Float.parseFloat(alarmData.getString("THRESHOLD_DOWN_DOWN_LIMIT"));
} else if ("联锁投入报警".equals(alarmType)) {
return Float.parseFloat(alarmData.getString("ALARM_VALUE"));
}
} catch (Exception e) {
logger.warn("获取报警阈值失败: {}", e.getMessage());
}
return null;
}
/** /**
* *
* @param data * @param data