refactor(data-push): 优化省级平台数据推送逻辑

- 调整定时任务频率,分离实时数据与报警数据推送
- 重构 WMK 设备数据获取方式,支持批量加载设备信息
- 增强数据类型转换稳定性,添加浮点数解析异常处理
- 完善日志记录,提升数据异常时的可追踪性
- 优化代码结构,提高推送任务执行效率
dev
wangyan 2025-11-25 14:29:12 +08:00
parent 8e1548568d
commit 531e7e83f3
1 changed files with 42 additions and 16 deletions

View File

@ -37,14 +37,16 @@ public class ProvincialPlatformDataPushScheduled {
@Resource
private RestTemplate restTemplate; // 注入RestTemplate用于HTTP请求
@Scheduled(cron = "*/5 * * * * ?")
public void scheduled() throws Exception {
@Scheduled(cron = "*/10 * * * * ?")
public void scheduledAlarm() throws Exception {
// 推送报警数据
// pushAlarmData();
}
@Scheduled(cron = "0 */1 * * * ?")
public void scheduledRealTime() throws Exception {
// 推送实时数据
pushRealTimeData();
// 推送报警数据
pushAlarmData();
}
void pushRealTimeData() {
@ -80,8 +82,20 @@ public class ProvincialPlatformDataPushScheduled {
if (!Tools.isEmpty(redisUtil.get("MES_DATA_LIST"))) {
allDataList.addAll(JSONArray.parseArray(redisUtil.get("MES_DATA_LIST").toString(), PageData.class));
}
if (!Tools.isEmpty(redisUtil.get("WMK_DATA_LIST"))) {
allDataList.addAll(JSONArray.parseArray(redisUtil.get("WMK_DATA_LIST").toString(), PageData.class));
if (!Tools.isEmpty(redisUtil.get("WMK_ALL_DEVICES"))) {
// 获取所有设备ID
List<String> deviceIds = JSONArray.parseArray(
(String) redisUtil.get("WMK_ALL_DEVICES"),
String.class
);
// 批量获取所有设备数据
for (String deviceId : deviceIds) {
Object deviceData = redisUtil.get("WMK_DEVICE:" + deviceId);
if (deviceData != null) {
allDataList.add(JSON.parseObject(deviceData.toString(), PageData.class));
}
}
}
for (PageData dataPd : allDataList) {
if (dataPd.containsKey("REPORT_ID") && !Tools.isEmpty(dataPd.getString("REPORT_ID"))) {
@ -90,15 +104,21 @@ public class ProvincialPlatformDataPushScheduled {
// 确保 CURRENT_VALUE 转换为 Float 类型
Object currentValue = dataPd.get("CURRENT_VALUE");
if (currentValue != null) {
if (currentValue instanceof String) {
data.put("value", Float.parseFloat((String) currentValue));
} else if (currentValue instanceof Number) {
data.put("value", ((Number) currentValue).floatValue());
} else {
data.put("value", currentValue);
try {
if (currentValue instanceof String) {
data.put("value", Float.parseFloat((String) currentValue));
} else if (currentValue instanceof Number) {
data.put("value", ((Number) currentValue).floatValue());
} else {
data.put("value", currentValue);
}
} catch (NumberFormatException e) {
logger.warn("数据转换异常REPORT_ID: {}CURRENT_VALUE: {}", dataPd.getString("REPORT_ID"), currentValue, e);
continue; // 跳过当前循环
}
} else {
data.put("value", null);
logger.warn("数据异常REPORT_ID: {}CURRENT_VALUE: {}", dataPd.getString("REPORT_ID"), currentValue);
continue; // 跳过当前循环
}
data.put("isValid", true); // 质量戳
dataList.add(data);
@ -191,6 +211,7 @@ public class ProvincialPlatformDataPushScheduled {
// 设置报警值
Object currentValue = alarmData.get("CURRENT_VALUE");
if (currentValue != null) {
try {
if (currentValue instanceof String) {
alarm.put("value", Float.parseFloat((String) currentValue));
} else if (currentValue instanceof Number) {
@ -198,8 +219,13 @@ public class ProvincialPlatformDataPushScheduled {
} else {
alarm.put("value", currentValue);
}
} catch (NumberFormatException e) {
logger.warn("报警数据转换异常REPORT_ID: {}CURRENT_VALUE: {}", alarmData.getString("REPORT_ID"), currentValue, e);
continue; // 跳过当前循环
}
} else {
alarm.put("value", null);
logger.warn("报警数据异常REPORT_ID: {}CURRENT_VALUE: {}", alarmData.getString("REPORT_ID"), currentValue);
continue; // 跳过当前循环
}
// 确定报警类型