八项作业mq功能

pull/3/head
liujun 2023-12-12 18:02:22 +08:00
parent f338b750da
commit 1daf485e92
14 changed files with 704 additions and 0 deletions

15
pom.xml
View File

@ -461,6 +461,21 @@
<artifactId>hutool-all</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<!-- <finalName>qa-prevention-gwj</finalName>

View File

@ -0,0 +1,68 @@
package com.zcloud.dto.mq;
import com.alibaba.fastjson.JSON;
import lombok.Data;
@Data
public class Response {
private String result;
private String exception;
private String code;
private String message;
public Response Ok() {
this.result = "succeed";
this.code = "0";
return this;
}
public static Response OK() {
Response response = new Response();
response.result = "succeed";
response.code = "0";
return response;
}
public Response Error() {
this.result = "succeed";
this.code = "9999";
this.exception = "系统异常";
return this;
}
public Response Error(String errorMessage) {
this.result = "succeed";
this.code = "9999";
this.exception = errorMessage;
return this;
}
public Response Error(String code, String errorMessage) {
this.result = "succeed";
this.code = code;
this.exception = errorMessage;
return this;
}
public static Response ERROR() {
Response response = new Response();
response.result = "succeed";
response.code = "0";
return response;
}
public static Response ERROR(String errorMessage) {
Response response = new Response();
response.result = "succeed";
response.code = "9999";
response.exception = errorMessage;
return response;
}
public void setMessage(Object obj) {
this.message = JSON.toJSONString(obj);
}
}

View File

@ -0,0 +1,46 @@
package com.zcloud.dto.mq;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.zcloud.entity.PageData;
import com.zcloud.util.Warden;
import lombok.Data;
@Data
public class TenCorpDto {
// 请秋id
private String id;
// 消息来源名称
private String producer_name;
private String topic;
//印记(方便寻找该条请求)
private String mark;
// 标记名称
private String mark_name;
// 消息发送时间(yyyy-MM-dd HH:mm:ss:SSS)
private String time_stamp;
// 消息体
private String message;
private PageData data;
public TenCorpDto() {
this.id = Warden.get32UUID();
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
public PageData getPd() {
PageData info = new PageData();
info.put("ID", this.id);
info.put("PRODUCER_NAME", this.producer_name);
info.put("MARK", this.mark);
info.put("MARK_NAME", this.mark_name);
info.put("MESSAGE", JSONObject.toJSONString(this.data));
info.put("TOPIC",this.topic);
return info;
}
}

View File

@ -0,0 +1,87 @@
package com.zcloud.mq.controller;
import com.alibaba.fastjson.JSON;
import com.zcloud.controller.gf.GFEightWorkController;
import com.zcloud.entity.PageData;
import com.zcloud.mq.mapper.MqMessageLogMapper;
import com.zcloud.dto.mq.TenCorpDto;
import com.zcloud.mq.service.LogService;
import com.zcloud.mq.service.SendMessageService;
import com.zcloud.mq.util.MqUtil;
import com.zcloud.util.DateUtil;
import com.zcloud.util.Warden;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
/**
* topic tag DEMO
* wangxuan
* www.zcloudchina.com
*/
@Component
@RocketMQMessageListener(consumerGroup = "${mq.group.eightWork}",
topic = "${mq.topic.eightWork}",
selectorType = SelectorType.TAG)
public class EightWorkListener implements RocketMQListener<String> {
@Resource
private GFEightWorkController eightWorkClient;
@Resource
private SendMessageService sendMessageService;
@Resource
private LogService logService;
@Resource
private MqMessageLogMapper mqMessageLogMapper;
@Override
public void onMessage(String message) {
TenCorpDto data = JSON.parseObject(message, TenCorpDto.class);
PageData log = data.getPd();
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
log.put("CREATE_TIME", DateUtil.getTime());
log.put("PRODUCER_NAME",data.getProducer_name());
log.put("TYPE","1");
try {
System.out.println("消费者:" + data.toString());
Date limit_date = MqUtil.dateFormat(data.getTime_stamp());
if (limit_date.getTime() - new Date().getTime() > 1000) {
sendMessageService.SendDelayQueue(data);
log.put("DIGESTION_FLAG","0");
} else {
switch (data.getMessage()){
case "confined-space":
System.out.println("受限空间模块消费");
// eightWorkClient.confinedSpaceCancel(data.getData());
break;
case "electricity":
System.out.println("临时用电模块消费");
// eightWorkClient.electricityCancel(data.getData());
break;
case "hot-work":
System.out.println("动火模块消费");
// eightWorkClient.hotWorkCancel(data.getData());
break;
default:
System.out.println("异常模块不消费");
throw new RuntimeException("未找到对应的消费者");
}
log.put("DIGESTION_FLAG","1");
}
mqMessageLogMapper.save(log);
} catch (Exception e) {
e.printStackTrace();
logService.saveErrorMessage(log,e.getMessage());
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,32 @@
package com.zcloud.mq.controller;
import com.alibaba.fastjson.JSON;
import com.zcloud.dto.mq.TenCorpDto;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* topic tag DEMO
* wangxuan
* www.zcloudchina.com
*/
@Component
@RocketMQMessageListener(consumerGroup = "${mq.group.info}",
topic = "${mq.topic.info}",
selectorType = SelectorType.TAG)
public class MqListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
TenCorpDto data = JSON.parseObject(message, TenCorpDto.class);
try {
System.out.println("2消费者:" + data.toString());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,54 @@
package com.zcloud.mq.controller;
import com.zcloud.controller.base.BaseController;
import com.zcloud.dto.mq.Response;
import com.zcloud.dto.mq.TenCorpDto;
import com.zcloud.mq.service.SendMessageService;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* TODO
* wangxuan
* www.zcloudchina.com
*/
@RestController
@RequestMapping("/messageQueue")
public class SendController extends BaseController {
@Resource
private SendMessageService sendMessageService;
@RequestMapping("/sendMessage")
public Response sendMessage(@RequestParam("tenCorpDto") TenCorpDto tenCorpDto) throws Exception {
try {
sendMessageService.sendMessage(tenCorpDto);
} catch (Exception e) {
e.printStackTrace();
}
return Response.OK();
}
@PostMapping("SendTimelyQueue")
public Response timelyQueue(@RequestBody TenCorpDto tenCorpDto) throws Exception {
try {
sendMessageService.sendMessage(tenCorpDto);
} catch (Exception e) {
e.printStackTrace();
}
return Response.OK();
}
@PostMapping("/SendDelayQueue")
public Response SendDelayQueue(@RequestBody TenCorpDto tenCorpDto) throws Exception {
try {
sendMessageService.SendDelayQueue(tenCorpDto);
return Response.OK();
} catch (Exception e) {
e.printStackTrace();
return Response.ERROR(e.getMessage());
}
}
}

View File

@ -0,0 +1,59 @@
package com.zcloud.mq.mapper;
import com.zcloud.entity.Page;
import com.zcloud.entity.PageData;
import java.util.List;
/**
* mq
* luoxiaobao
* 2023-06-28
* www.zcloudchina.com
*/
public interface MqErrorMessageLogMapper{
/**
* @param pd
* @throws Exception
*/
void save(PageData pd);
/**
* @param pd
* @throws Exception
*/
void delete(PageData pd);
/**
* @param pd
* @throws Exception
*/
void edit(PageData pd);
/**
* @param page
* @throws Exception
*/
List<PageData> datalistPage(Page page);
/**()
* @param pd
* @throws Exception
*/
List<PageData> listAll(PageData pd);
/**id
* @param pd
* @throws Exception
*/
PageData findById(PageData pd);
/**
* @param ArrayDATA_IDS
* @throws Exception
*/
void deleteAll(String[] ArrayDATA_IDS);
}

View File

@ -0,0 +1,59 @@
package com.zcloud.mq.mapper;
import com.zcloud.entity.Page;
import com.zcloud.entity.PageData;
import java.util.List;
/**
* mq\
* luoxiaobao
* 2023-06-28
* www.zcloudchina.com
*/
public interface MqMessageLogMapper{
/**
* @param pd
* @throws Exception
*/
void save(PageData pd);
/**
* @param pd
* @throws Exception
*/
void delete(PageData pd);
/**
* @param pd
* @throws Exception
*/
void edit(PageData pd);
/**
* @param page
* @throws Exception
*/
List<PageData> datalistPage(Page page);
/**()
* @param pd
* @throws Exception
*/
List<PageData> listAll(PageData pd);
/**id
* @param pd
* @throws Exception
*/
PageData findById(PageData pd);
/**
* @param ArrayDATA_IDS
* @throws Exception
*/
void deleteAll(String[] ArrayDATA_IDS);
}

View File

@ -0,0 +1,11 @@
package com.zcloud.mq.service;
import com.zcloud.entity.PageData;
public interface LogService {
void saveLog(PageData pageData);
void saveLog(PageData pageData,String type);
void saveErrorMessage(PageData pageData,String errorMessage);
}

View File

@ -0,0 +1,11 @@
package com.zcloud.mq.service;
import com.zcloud.dto.mq.TenCorpDto;
public interface SendMessageService {
void sendMessage(TenCorpDto tenCorpDto) throws Exception;
void SendDelayQueue(TenCorpDto tenCorpDto);
void SendDelayQueue(String message);
}

View File

@ -0,0 +1,43 @@
package com.zcloud.mq.service.impl;
import com.zcloud.entity.PageData;
import com.zcloud.mq.mapper.MqErrorMessageLogMapper;
import com.zcloud.mq.mapper.MqMessageLogMapper;
import com.zcloud.mq.service.LogService;
import com.zcloud.util.DateUtil;
import com.zcloud.util.Warden;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class LogServiceImpl implements LogService {
@Resource
private MqErrorMessageLogMapper mqErrorMessageLogMapper;
@Resource
private MqMessageLogMapper mqMessageLogMapper;
@Override
@Async
public void saveLog(PageData pageData) {
this.saveLog(pageData,"0");
}
@Override
public void saveLog(PageData pageData, String type) {
}
@Override
@Async
public void saveErrorMessage(PageData log,String errorMessage) {
log.put("ERROR_MESSAGE", errorMessage);
log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID());
log.put("TYPE", "0");
log.put("TIME", DateUtil.getTime());
mqErrorMessageLogMapper.save(log);
}
}

View File

@ -0,0 +1,129 @@
package com.zcloud.mq.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.zcloud.dto.mq.TenCorpDto;
import com.zcloud.entity.PageData;
import com.zcloud.mq.mapper.MqErrorMessageLogMapper;
import com.zcloud.mq.mapper.MqMessageLogMapper;
import com.zcloud.mq.service.LogService;
import com.zcloud.mq.service.SendMessageService;
import com.zcloud.mq.util.MqUtil;
import com.zcloud.util.DateUtil;
import com.zcloud.util.Warden;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class SendMessageServiceImpl implements SendMessageService {
@Value("${mq.topic.info}")
private String info;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private MqErrorMessageLogMapper mqErrorMessageLogMapper;
@Resource
private LogService logService;
@Resource
private MqMessageLogMapper mqMessageLogMapper;
public void sendMessage(TenCorpDto tenCorpDto) throws Exception {
try {
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
log.put("CREATE_TIME", DateUtil.getTime());
mqMessageLogMapper.save(log);
System.out.println("生产者:" + tenCorpDto.toString());
SendResult sendResult = rocketMQTemplate.syncSend(this.info, tenCorpDto.toString());
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("产品入栈失败");
}
} catch (Exception e) {
e.printStackTrace();
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID());
log.put("ERROR_MESSAGE", e.getMessage());
log.put("TYPE", "0");
log.put("TIME", DateUtil.getTime());
mqErrorMessageLogMapper.save(log);
}
}
@Override
public void SendDelayQueue(TenCorpDto tenCorpDto) {
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
try {
log.put("CREATE_TIME", DateUtil.getTime());
log.put("PRODUCER_NAME",tenCorpDto.getProducer_name());
log.put("TYPE","0");
log.put("PLAN_TIME",tenCorpDto.getTime_stamp());
mqMessageLogMapper.save(log);
System.out.println("生产者:" + tenCorpDto.toString());
// 推送消息
SendResult sendResult = rocketMQTemplate.syncSend(
MqUtil.analysistopic(tenCorpDto.getTopic()),
new GenericMessage<>(tenCorpDto.toString()),
3000,
MqUtil.analysisTime(tenCorpDto.getTime_stamp()));
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("产品入栈失败");
}
} catch (Exception e) {
e.printStackTrace();
log.put("ERROR_MESSAGE", e.getMessage());
log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID());
log.put("TYPE", "0");
log.put("TIME", DateUtil.getTime());
mqErrorMessageLogMapper.save(log);
}
}
@Override
public void SendDelayQueue(String message) {
TenCorpDto tenCorpDto = JSONObject.parseObject(message,TenCorpDto.class);
try {
PageData log = tenCorpDto.getPd();
log.put("CREATE_TIME", DateUtil.getTime());
log.put("PRODUCER_NAME",tenCorpDto.getProducer_name());
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
mqMessageLogMapper.save(log);
System.out.println("生产者:" + tenCorpDto.toString());
// 推送消息
SendResult sendResult = rocketMQTemplate.syncSend(
MqUtil.analysistopic(tenCorpDto.getTopic()),
new GenericMessage<>(message),
3000,
MqUtil.analysisTime(tenCorpDto.getTime_stamp()));
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("产品入栈失败");
}
} catch (Exception e) {
PageData log = tenCorpDto.getPd();
logService.saveErrorMessage(log,e.getMessage());
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
System.out.println(MqUtil.analysisTime("2023-06-30 14:30:00:000"));
}catch (Exception e){
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,6 @@
package com.zcloud.mq.util;
public class Const {
public static final String topic_eightWork="eightWork";
public static final String topic_info="info";
}

View File

@ -0,0 +1,84 @@
package com.zcloud.mq.util;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MqUtil {
private final static SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
public static Integer analysisTime(String time) throws Exception{
Date agreed_date = timeFormat.parse(time);
long time_difference = agreed_date.getTime() - new Date().getTime();
if (time_difference < 0){
return 0;
}
if (time_difference > 2*60*60*1000){
return 18;
}
if (time_difference > 60 * 60 * 1000){
return 17;
}
if (time_difference > 30 * 60 * 1000){
return 16;
}
if (time_difference > 20 * 60 * 1000){
return 15;
}
if (time_difference > 10 * 60 * 1000){
return 14;
}
if (time_difference > 9 * 60 * 1000){
return 13;
}
if (time_difference > 8 * 60 * 1000){
return 12;
}
if (time_difference > 7 * 60 * 1000){
return 11;
}
if (time_difference > 6 * 60 * 1000){
return 10;
}
if (time_difference > 5 * 60 * 1000){
return 9;
}
if (time_difference > 4 * 60 * 1000){
return 8;
}
if (time_difference > 3 * 60 * 1000){
return 7;
}
if (time_difference > 2 * 60 * 1000){
return 6;
}
if (time_difference > 60 * 1000){
return 5;
}
if (time_difference > 30 * 1000){
return 4;
}
if (time_difference > 10 * 1000){
return 3;
}
if (time_difference > 5 * 1000){
return 2;
}
if (time_difference > 1000){
return 1;
}
return 0;
}
public static String analysistopic(String producerName) {
switch (producerName){
case "eightWork" : return Const.topic_eightWork;
case "info" : return Const.topic_info;
default: return "";
}
}
public static Date dateFormat(String date) throws Exception{
return timeFormat.parse(date);
}
}