package com.zcloud.service.mq.impl; import com.alibaba.fastjson.JSONObject; import com.zcloud.dto.TenCorpDto; import com.zcloud.entity.PageData; import com.zcloud.mapper.datasource.mq.MqErrorMessageLogMapper; import com.zcloud.mapper.datasource.mq.MqMessageLogMapper; import com.zcloud.service.mq.LogService; import com.zcloud.service.mq.SendMessageService; import com.zcloud.util.DateUtil; import com.zcloud.util.Warden; import com.zcloud.util.mq.MqUtil; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class SendMessageServiceImpl implements SendMessageService { @Value("${mq.topic.docking}") private String info; @Resource private RocketMQTemplate rocketMQTemplate; @Resource private MqErrorMessageLogMapper mqErrorMessageLogMapper; @Resource private LogService logService; @Resource private MqMessageLogMapper mqMessageLogMapper; public void sendMessage(TenCorpDto tenCorpDto) throws Exception { try { PageData log = tenCorpDto.getPd(); log.put("MESSAGE_LOG_ID", Warden.get32UUID()); log.put("CREATE_TIME", DateUtil.getTime()); mqMessageLogMapper.save(log); System.out.println("生产者:" + tenCorpDto.toString()); SendResult sendResult = rocketMQTemplate.syncSend(this.info, tenCorpDto.toString()); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException("产品入栈失败"); } } catch (Exception e) { e.printStackTrace(); PageData log = tenCorpDto.getPd(); log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID()); log.put("ERROR_MESSAGE", e.getMessage()); log.put("TYPE", "0"); log.put("TIME", DateUtil.getTime()); mqErrorMessageLogMapper.save(log); } } @Override public void SendDelayQueue(TenCorpDto tenCorpDto) { PageData log = tenCorpDto.getPd(); log.put("MESSAGE_LOG_ID", Warden.get32UUID()); try { log.put("CREATE_TIME", DateUtil.getTime()); log.put("PRODUCER_NAME",tenCorpDto.getProducer_name()); log.put("TYPE","0"); log.put("PLAN_TIME",tenCorpDto.getTime_stamp()); mqMessageLogMapper.save(log); System.out.println("生产者:" + tenCorpDto.toString()); // 推送消息 SendResult sendResult = rocketMQTemplate.syncSend( MqUtil.analysistopic(tenCorpDto.getTopic()), new GenericMessage<>(tenCorpDto.toString()), 3000, MqUtil.analysisTime(tenCorpDto.getTime_stamp())); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException("产品入栈失败"); } } catch (Exception e) { e.printStackTrace(); log.put("ERROR_MESSAGE", e.getMessage()); log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID()); log.put("TYPE", "0"); log.put("TIME", DateUtil.getTime()); mqErrorMessageLogMapper.save(log); } } @Override public void SendDelayQueue(String message) { TenCorpDto tenCorpDto = JSONObject.parseObject(message,TenCorpDto.class); try { PageData log = tenCorpDto.getPd(); log.put("CREATE_TIME", DateUtil.getTime()); log.put("PRODUCER_NAME",tenCorpDto.getProducer_name()); log.put("MESSAGE_LOG_ID", Warden.get32UUID()); mqMessageLogMapper.save(log); System.out.println("生产者:" + tenCorpDto.toString()); // 推送消息 SendResult sendResult = rocketMQTemplate.syncSend( MqUtil.analysistopic(tenCorpDto.getTopic()), new GenericMessage<>(message), 3000, MqUtil.analysisTime(tenCorpDto.getTime_stamp())); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException("产品入栈失败"); } } catch (Exception e) { PageData log = tenCorpDto.getPd(); logService.saveErrorMessage(log,e.getMessage()); e.printStackTrace(); } } public static void main(String[] args) { try { System.out.println(MqUtil.analysisTime("2023-06-30 14:30:00:000")); }catch (Exception e){ e.printStackTrace(); } } }