人员中台 - 切面、MQ通知、MQ监听
parent
a4174fafaa
commit
3fc60743f2
|
@ -6,12 +6,16 @@ import java.util.*;
|
|||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.druid.support.json.JSONUtils;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.zcloud.dto.UpdateEnum;
|
||||
import com.zcloud.entity.system.Dictionaries;
|
||||
import com.zcloud.entity.system.User;
|
||||
import com.zcloud.service.bus.*;
|
||||
import com.zcloud.service.system.*;
|
||||
import com.zcloud.syncData.SyncPlatformAdvice;
|
||||
import com.zcloud.syncData.SyncTypeEnum;
|
||||
import com.zcloud.util.*;
|
||||
import org.apache.commons.collections.map.ListOrderedMap;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
@ -532,6 +536,7 @@ public class UsersController extends BaseController {
|
|||
@RequestMapping(value = "/editUser")
|
||||
@RequiresPermissions("user:edit")
|
||||
@ResponseBody
|
||||
@SyncPlatformAdvice(type = SyncTypeEnum.PERSONNEL, isUpdate = true)
|
||||
public Object editUser() throws Exception {
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
String errInfo = "success";
|
||||
|
@ -777,6 +782,8 @@ public class UsersController extends BaseController {
|
|||
imgfilesService.delete(pd); //删除旧人脸图片
|
||||
}
|
||||
map.put("result", errInfo);
|
||||
map.put("code","0");
|
||||
map.put("syncInfo", JSONUtil.toJsonStr(usersService.findById(pd)));
|
||||
return map;
|
||||
}
|
||||
|
||||
|
@ -786,6 +793,7 @@ public class UsersController extends BaseController {
|
|||
@RequestMapping(value = "/editUserFuns")
|
||||
@RequiresPermissions("user:edit")
|
||||
@ResponseBody
|
||||
@SyncPlatformAdvice(type = SyncTypeEnum.PERSONNEL, isUpdate = true)
|
||||
public Object editUserFuns() throws Exception {
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
String errInfo = "success";
|
||||
|
@ -794,6 +802,8 @@ public class UsersController extends BaseController {
|
|||
FHLOG.save(Jurisdiction.getUsername(), "从系统用户中修改" + pd.getString("USERNAME") + "的小程序菜单权限"); //记录日志
|
||||
usersService.editUserFuns(pd); //执行修改
|
||||
map.put("result", errInfo);
|
||||
map.put("code", "0");
|
||||
map.put("syncInfo", JSONUtils.toJSONString(usersService.findById(pd)));
|
||||
return map;
|
||||
}
|
||||
|
||||
|
@ -802,6 +812,7 @@ public class UsersController extends BaseController {
|
|||
*/
|
||||
@RequestMapping(value = "/editUserOwn")
|
||||
@ResponseBody
|
||||
@SyncPlatformAdvice(type = SyncTypeEnum.PERSONNEL, isUpdate = true)
|
||||
public Object editUserOwn() throws Exception {
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
String errInfo = "success";
|
||||
|
@ -823,6 +834,8 @@ public class UsersController extends BaseController {
|
|||
usersService.editUser(pd); //执行修改
|
||||
FHLOG.save(Jurisdiction.getUsername(), "从个人资料中修改" + pd.getString("USERNAME") + "的资料"); //记录日志
|
||||
map.put("result", errInfo);
|
||||
map.put("code", "0");
|
||||
map.put("syncInfo", JSONUtils.toJSONString(usersService.findById(pd)));
|
||||
return map;
|
||||
}
|
||||
|
||||
|
@ -895,6 +908,7 @@ public class UsersController extends BaseController {
|
|||
@RequestMapping(value = "/saveUser")
|
||||
@RequiresPermissions("user:add")
|
||||
@ResponseBody
|
||||
@SyncPlatformAdvice(type = SyncTypeEnum.PERSONNEL, isInsert = true)
|
||||
public Object saveUser() throws Exception {
|
||||
Map<String, Object> map = new HashMap<>();
|
||||
String errInfo = "success";
|
||||
|
@ -1019,6 +1033,8 @@ public class UsersController extends BaseController {
|
|||
map.put("USERINFO", userinfo);
|
||||
map.put("USER_ID",ID);
|
||||
map.put("result", errInfo); //返回结果
|
||||
map.put("code", "0");
|
||||
map.put("syncInfo", JSONUtils.toJSONString(userinfo));
|
||||
} else {
|
||||
map.put("result", "您输入的身份证号和用户名二次校验失败,请确认后重新申请"); //返回结果
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ public class TenCorpDto {
|
|||
private String producer_name = "qa-prevention-cfd";
|
||||
private String topic;
|
||||
//印记(方便寻找该条请求)
|
||||
// 中台根据这个区分同步的消息类型(人员同步/隐患同步/安检同步等)
|
||||
private String mark;
|
||||
// 标记名称
|
||||
private String mark_name;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.zcloud.service.mq.impl;
|
||||
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.zcloud.dto.TenCorpDto;
|
||||
import com.zcloud.entity.PageData;
|
||||
|
@ -33,6 +34,10 @@ public class DockSendMessageServiceImpl implements DockSendMessageService {
|
|||
private String cmtDataDocking;
|
||||
@Value("${mq.czks.data.topic}")
|
||||
private String czksDataDocking;
|
||||
|
||||
@Value("${mq.producer.dataChange.slice-data-change.topic}")
|
||||
private String sliceDataChangeTopic;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
|
@ -72,6 +77,12 @@ public class DockSendMessageServiceImpl implements DockSendMessageService {
|
|||
if (tenCorpDto.getTopic().equals(csyDataDocking)) {
|
||||
sendResult = rocketMQTemplate.syncSend(this.csyDataDocking, tenCorpDto.toString());
|
||||
}
|
||||
|
||||
// 中台同步
|
||||
if (tenCorpDto.getTopic().equals(sliceDataChangeTopic)){
|
||||
sendResult = rocketMQTemplate.syncSend(this.sliceDataChangeTopic, JSONUtil.toJsonStr(tenCorpDto));
|
||||
}
|
||||
|
||||
if (sendResult != null && !sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
|
||||
throw new RuntimeException("产品入栈失败");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
package com.zcloud.syncData;
|
||||
|
||||
import com.zcloud.dto.TenCorpDto;
|
||||
import com.zcloud.entity.PageData;
|
||||
import com.zcloud.service.mq.DockSendMessageService;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Pointcut;
|
||||
import org.aspectj.lang.reflect.MethodSignature;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 说明:同步切面
|
||||
* 使用次切面的方法需使用@SyncPlatformAdvice注解,返回类型为Map或其子类
|
||||
* map中需要包含code字段,用于标识该方法的执行结果:"0"成功,其他失败
|
||||
* map中需要包含syncInfo字段,用于传递该方法的需要同步的参数,以JSON格式传递
|
||||
* :@SyncPlatformAdvice 注解中的 type 参数为必填项,表示同步的数据类型
|
||||
* 类型可参考SyncTypeEnum类
|
||||
* 作者:water_xu
|
||||
* 官网:www.zcloudchina.com
|
||||
*/
|
||||
@Aspect
|
||||
@Component
|
||||
public class SyncDataAdviceAspect {
|
||||
|
||||
@Pointcut(value = "@annotation(com.zcloud.syncData.SyncPlatformAdvice)")
|
||||
public void syncPointCut() {}
|
||||
|
||||
@Autowired
|
||||
private DockSendMessageService dockSendMessageService;
|
||||
|
||||
@Value("${mq.producer.dataChange.slice-data-change.topic}")
|
||||
private String sliceDataChangeTopic;
|
||||
|
||||
@Around("syncPointCut()")
|
||||
public Object around(ProceedingJoinPoint point) throws Throwable {
|
||||
|
||||
// 执行目标方法
|
||||
Object result = null;
|
||||
try {
|
||||
result = point.proceed();
|
||||
}catch (Exception e){
|
||||
System.out.println("==========中台同步切面执行目标方法异常==========");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (result instanceof Map) {
|
||||
Map resultMap = (Map) result;
|
||||
if ("0".equals(resultMap.get("code")) || "0".equals(resultMap.get("CODE"))) {
|
||||
MethodSignature signature = (MethodSignature) point.getSignature();
|
||||
Method method = signature.getMethod();
|
||||
if (method.isAnnotationPresent(SyncPlatformAdvice.class)) {
|
||||
SyncPlatformAdvice syncPlatformAdvice = method.getAnnotation(SyncPlatformAdvice.class);
|
||||
|
||||
// 处理数据
|
||||
TenCorpDto message = new TenCorpDto();
|
||||
PageData pd = new PageData();
|
||||
resultMap.put("EXCEPT_SOURCE","qy");
|
||||
if (resultMap.get("syncInfo") == null || "".equals(resultMap.get("syncInfo"))){
|
||||
System.out.println("!!!----------使用同步切面需要返回的map中包含syncInfo字段-----------!!!");
|
||||
System.out.println("!!!-------------syncInfo为需要同步的数据 以JSON传递--------------!!!");
|
||||
System.out.println("!!!----------使用同步切面需要返回的map中包含syncInfo字段-----------!!!");
|
||||
return null;
|
||||
}
|
||||
pd.putAll(resultMap);
|
||||
message.setData(pd);
|
||||
message.setMessage("qy");
|
||||
message.setMark(syncPlatformAdvice.type().getDescription());
|
||||
message.setProducer_name("qy");
|
||||
message.setTopic(sliceDataChangeTopic);
|
||||
|
||||
// 判断同步类型
|
||||
if (syncPlatformAdvice.type().equals(SyncTypeEnum.HIDDEN)){
|
||||
// 同步隐患
|
||||
}
|
||||
if (syncPlatformAdvice.type().equals(SyncTypeEnum.PERSONNEL)){
|
||||
// 同步人员
|
||||
dockSendMessageService.sendMessage(message);
|
||||
}
|
||||
if (syncPlatformAdvice.type().equals(SyncTypeEnum.SAFETY_ENVIRONMENTAL_CHECK)){
|
||||
// 同步安全环保检查
|
||||
}
|
||||
if (syncPlatformAdvice.type().equals(SyncTypeEnum.KEY_PROJECT)){
|
||||
// 同步重点工程
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
} else {
|
||||
if (resultMap.get("code") == null || "".equals(resultMap.get("code"))){
|
||||
System.out.println("!!!----------使用同步切面需要返回的map中包含code字段-----------!!!");
|
||||
System.out.println("!!!------------------code为\"0\"成功 其他失败------------------!!!");
|
||||
System.out.println("!!!----------使用同步切面需要返回的map中包含code字段-----------!!!");
|
||||
}else{
|
||||
// 方法内执行失败,返回给调用端失败信息
|
||||
throw new RuntimeException(resultMap.get("code") + " ==> " + resultMap.get("msg").toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.zcloud.syncData;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* 用户同步通知注解,在方法体上加入此注解会进行AOP解析,并以MQ通知给消费者
|
||||
* @author :water_xu
|
||||
* @date :2024.8.6
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
public @interface SyncPlatformAdvice {
|
||||
// 没啥用
|
||||
String value() default "";
|
||||
// 数据类型标记 (必须)
|
||||
SyncTypeEnum type();
|
||||
// 是否更新
|
||||
boolean isUpdate() default false;
|
||||
// 是否插入
|
||||
boolean isInsert() default false;
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.zcloud.syncData;
|
||||
|
||||
/**
|
||||
* 用于向中台同步的切面枚举,区分同步的类型
|
||||
* @autor water_xu
|
||||
* @date 2024.8.8
|
||||
*/
|
||||
public enum SyncTypeEnum {
|
||||
PERSONNEL("人员"),
|
||||
HIDDEN("隐患"),
|
||||
SAFETY_ENVIRONMENTAL_CHECK("安全环保检查"),
|
||||
KEY_PROJECT("重点工程");
|
||||
|
||||
private final String description;
|
||||
|
||||
SyncTypeEnum(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据给定的字符串描述判断是否为枚举中的一个类型。
|
||||
*
|
||||
* @param description 描述字符串
|
||||
* @return 如果匹配则返回 true,否则返回 false
|
||||
*/
|
||||
public static boolean matches(String description) {
|
||||
for (SyncTypeEnum type : values()) {
|
||||
if (type.getDescription().equals(description)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,140 @@
|
|||
package com.zcloud.syncData.listener;
|
||||
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.zcloud.dto.TenCorpDto;
|
||||
import com.zcloud.entity.PageData;
|
||||
import com.zcloud.mapper.datasource.xgf.XgfUserDetailsMapper;
|
||||
import com.zcloud.mapper.datasource.xgf.XgfUserMapper;
|
||||
import com.zcloud.service.mq.DockSendMessageService;
|
||||
import com.zcloud.service.system.UsersService;
|
||||
import com.zcloud.service.xgf.XgfUserService;
|
||||
import com.zcloud.syncData.SyncTypeEnum;
|
||||
import com.zcloud.util.DateUtil;
|
||||
import com.zcloud.util.StackTraceUtils;
|
||||
import com.zcloud.util.UuidUtil;
|
||||
import com.zcloud.util.Warden;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.annotation.SelectorType;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "${mq.consumer.dataChange.tongbu-data-change.group}",
|
||||
topic = "${mq.consumer.dataChange.tongbu-data-change.topic}",
|
||||
selectorType = SelectorType.TAG,
|
||||
messageModel = MessageModel.BROADCASTING) // 添加广播模式
|
||||
public class SyncDataListener implements RocketMQListener<String> {
|
||||
|
||||
// @Resource
|
||||
// private MqMessageLogMapper mqMessageLogMapper;
|
||||
|
||||
public static Map<String, String> dockingRelationMap = new HashMap();
|
||||
|
||||
// @Autowired
|
||||
// private MqConsumptionLogMapper mqConsumptionLogMapper;
|
||||
|
||||
@Autowired
|
||||
private UsersService usersService;
|
||||
@Autowired
|
||||
private XgfUserMapper xgfUserMapper;
|
||||
@Autowired
|
||||
private XgfUserDetailsMapper xgfUserDetailsMapper;
|
||||
@Autowired
|
||||
private DockSendMessageService dockSendMessageService;
|
||||
|
||||
// @Autowired
|
||||
// private static MqConsumptionErrorLogMapper mqConsumptionErrorLogMapper;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
//获取消息
|
||||
TenCorpDto tenCorpDto = JSON.parseObject(message, TenCorpDto.class);
|
||||
|
||||
PageData productionPD = new PageData();
|
||||
productionPD.put("CONSUMPTION_ID", Warden.get32UUID());
|
||||
productionPD.put("MESSAGE_LOG_ID",tenCorpDto.getMessageLogId());
|
||||
productionPD.put("CONSUMPTION_TYPE","gwj-MqQyDataListener.onMessage(String message)");
|
||||
productionPD.put("CONSUMPTION_TIME", DateUtil.getTime());
|
||||
productionPD.put("CREATE_TIME", tenCorpDto.getCREATE_TIME() == null ? "" : tenCorpDto.getCREATE_TIME());
|
||||
productionPD.put("MARK", tenCorpDto.getMark() == null ? "" : tenCorpDto.getMark());
|
||||
productionPD.put("DATA", tenCorpDto.getData()== null ? "" : tenCorpDto.getData().toString());
|
||||
//从消息中获取参数数据
|
||||
PageData pd = tenCorpDto.getData();
|
||||
// save:
|
||||
pd.put("SUCCESS",0);
|
||||
pd.put("CONSUMPTION_ID", UuidUtil.get32UUID());
|
||||
pd.putAll(tenCorpDto.getPd());
|
||||
pd.put("CONSUMPTION_TIME", DateUtil.date2Str(new Date()));
|
||||
pd.put("CONSUMPTION_TYPE","MqQyDataListener");
|
||||
if ("qy".equals(tenCorpDto.getMessage())){
|
||||
// 在本端发的同步消息,不需要再次同步该数据
|
||||
return;
|
||||
}
|
||||
try {
|
||||
//拿到原路径
|
||||
String url = pd.getString("url");
|
||||
System.out.println(url + pd);
|
||||
|
||||
// 人员
|
||||
if (SyncTypeEnum.PERSONNEL.getDescription().equals(tenCorpDto.getMark())){
|
||||
System.out.println("people");
|
||||
String str = tenCorpDto.getData().getString("syncInfo");
|
||||
PageData syncInfo = JSONUtil.toBean(str, PageData.class);
|
||||
if ("xgf".equals(tenCorpDto.getMessage())){
|
||||
// 相关方推送更新
|
||||
syncInfo.put("XGF_USER_ID",syncInfo.getString("USER_ID"));
|
||||
PageData localData = xgfUserMapper.findById(syncInfo);
|
||||
if (localData != null && !localData.isEmpty()){
|
||||
xgfUserMapper.edit(syncInfo);
|
||||
syncInfo.put("XGF_USER_DETAILS_ID",localData.getString("XGF_USER_ID"));
|
||||
PageData localDetailsData = xgfUserDetailsMapper.findById(syncInfo);
|
||||
if (localDetailsData != null && !localDetailsData.isEmpty()){
|
||||
xgfUserDetailsMapper.edit(syncInfo);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
PageData localData = usersService.findById(syncInfo);
|
||||
if (localData != null && !localData.isEmpty()){
|
||||
usersService.editUser(syncInfo);
|
||||
} else {
|
||||
// 2024.8.9 按郭跃鹏提出:后期各公司从企业端分离后,需要其他公司的用户进行SQL联查,要求新增一张core_user的同步表,用于记录中台的所有用户数据
|
||||
}
|
||||
|
||||
}
|
||||
// 隐患
|
||||
if (SyncTypeEnum.HIDDEN.getDescription().equals(tenCorpDto.getMark())){
|
||||
|
||||
}
|
||||
// 安全环保检查
|
||||
if (SyncTypeEnum.SAFETY_ENVIRONMENTAL_CHECK.getDescription().equals(tenCorpDto.getMark())){
|
||||
|
||||
}
|
||||
// 重点工程
|
||||
if (SyncTypeEnum.KEY_PROJECT.getDescription().equals(tenCorpDto.getMark())){
|
||||
|
||||
}
|
||||
|
||||
// mqConsumptionLogMapper.save(productionPD);
|
||||
// mqConsumptionLogMapper.save(pd);
|
||||
} catch (Exception e) {
|
||||
productionPD.put("SUCCESS",'0');
|
||||
productionPD.put("ERROR_MESSAGE", StackTraceUtils.printStackTraceToString(e));
|
||||
// mqConsumptionErrorLogMapper.save(productionPD);
|
||||
pd.put("ERROR_MESSAGE",e.getMessage());
|
||||
// mqConsumptionLogMapper.save(pd);
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package com.zcloud.util;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class StackTraceUtils {
|
||||
public static String printStackTraceToString(Throwable t) {
|
||||
StringWriter sw = new StringWriter();
|
||||
t.printStackTrace(new PrintWriter(sw, true));
|
||||
return sw.getBuffer().toString();
|
||||
}
|
||||
}
|
|
@ -143,3 +143,11 @@ gongJiangXueYuanProdUrl=https://gjxy.bjttsx.com
|
|||
#gongJiangXueYuanGetImgUrl=https://skqhdg.porthebei.com:9004/file/
|
||||
# \u7ED9\u5DE5\u5320\u5B66\u9662\u63A8\u9001\u4EBA\u5458\u6570\u636E\u65F6\u7684\u56FE\u7247\u9644\u4EF6\u524D\u7F00 \uFF08\u6D4B\u8BD5\u516C\u7F51\uFF09
|
||||
gongJiangXueYuanGetImgUrl=https://wwag.qhdsafety.com/file/
|
||||
|
||||
# \u4E2D\u53F0
|
||||
# \u6D88\u8D39\u4E2D\u53F0\u901A\u77E5
|
||||
mq.consumer.dataChange.tongbu-data-change.topic=tongbu_dataChange_docking
|
||||
mq.consumer.dataChange.tongbu-data-change.group=tongbu_dataChange_group
|
||||
|
||||
# \u672C\u5730\u6570\u636E\u4FEE\u6539\u540E\u5411\u8FD9\u4E2Atopic\u63A8\u9001\uFF08\u6240\u6709\u7C7B\u578B\u540C\u6B65\u6570\u636E\u516C\u7528\uFF09
|
||||
mq.producer.dataChange.slice-data-change.topic=slice_dataChange_docking
|
Loading…
Reference in New Issue