qa-prevention-gwj/src/main/java/com/zcloud/service/mq/impl/DockSendMessageServiceImpl....

137 lines
5.7 KiB
Java

package com.zcloud.service.mq.impl;
import com.alibaba.fastjson.JSONObject;
import com.zcloud.dto.TenCorpDto;
import com.zcloud.entity.PageData;
import com.zcloud.mapper.datasource.mq.MqErrorMessageLogMapper;
import com.zcloud.mapper.datasource.mq.MqMessageLogMapper;
import com.zcloud.service.mq.DockSendMessageService;
import com.zcloud.service.mq.LogService;
import com.zcloud.util.DateUtil;
import com.zcloud.util.Warden;
import com.zcloud.util.mq.MqUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class DockSendMessageServiceImpl implements DockSendMessageService {
@Value("${mq.gwj.data.topic}")
private String gwjDataTopic;
@Value("${mq.gwj.file.topic}")
private String gwjFileTopic;
@Value("${mq.csy.data.topic}")
private String csyDataDocking;
@Value("${mq.cmt.data.topic}")
private String cmtDataDocking;
@Value("${mq.czks.data.topic}")
private String czksDataDocking;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private MqErrorMessageLogMapper mqErrorMessageLogMapper;
@Resource
private LogService logService;
@Resource
private MqMessageLogMapper mqMessageLogMapper;
@Override
public void sendMessage(TenCorpDto tenCorpDto) throws Exception {
PageData productionPD = new PageData();
productionPD.put("PRODUCTION_ID", Warden.get32UUID());
productionPD.put("MESSAGE_LOG_ID", tenCorpDto.getMessageLogId() == null ? "" : tenCorpDto.getMessageLogId());
productionPD.put("PRODUCTION_TYPE", "gwj-DockSendMessageServiceImpl.sendMessage(TenCorpDto tenCorpDto)");
productionPD.put("PRODUCTION_TIME", DateUtil.getTime());
productionPD.put("CREATE_TIME", tenCorpDto.getCREATE_TIME() == null ? "" : tenCorpDto.getCREATE_TIME());
productionPD.put("MARK", tenCorpDto.getMark() == null ? "" : tenCorpDto.getMark());
productionPD.put("DATA", tenCorpDto.getData() == null ? "" : tenCorpDto.getData().toString());
try {
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
log.put("CREATE_TIME", DateUtil.getTime());
mqMessageLogMapper.save(log);
System.out.println("生产者:" + tenCorpDto.toString());
SendResult sendResult = null;
// 曹煤炭 后期是 三家
if (tenCorpDto.getTopic().equals(czksDataDocking)) {
sendResult = rocketMQTemplate.syncSend(this.czksDataDocking, tenCorpDto.toString());
}
if (tenCorpDto.getTopic().equals(cmtDataDocking)) {
sendResult = rocketMQTemplate.syncSend(this.cmtDataDocking, tenCorpDto.toString());
}
if (tenCorpDto.getTopic().equals(csyDataDocking)) {
sendResult = rocketMQTemplate.syncSend(this.csyDataDocking, tenCorpDto.toString());
}
if (sendResult != null && !sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("产品入栈失败");
}
} catch (Exception e) {
e.printStackTrace();
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID());
log.put("ERROR_MESSAGE", e.getMessage());
log.put("TYPE", "0");
log.put("TIME", DateUtil.getTime());
mqErrorMessageLogMapper.save(log);
}
}
@Override
public void sendMessagePicture(TenCorpDto tenCorpDto) throws Exception {
try {
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
log.put("CREATE_TIME", DateUtil.getTime());
mqMessageLogMapper.save(log);
System.out.println("生产者:" + tenCorpDto.toString());
SendResult sendResult = rocketMQTemplate.syncSend(this.gwjFileTopic, tenCorpDto.toString());
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("产品入栈失败");
}
} catch (Exception e) {
e.printStackTrace();
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID());
log.put("ERROR_MESSAGE", e.getMessage());
log.put("TYPE", "0");
log.put("TIME", DateUtil.getTime());
mqErrorMessageLogMapper.save(log);
}
}
@Override
public void sendMessagePictureDelete(TenCorpDto tenCorpDto) throws Exception {
try {
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_LOG_ID", Warden.get32UUID());
log.put("CREATE_TIME", DateUtil.getTime());
mqMessageLogMapper.save(log);
System.out.println("生产者:" + tenCorpDto.toString());
SendResult sendResult = rocketMQTemplate.syncSend(this.gwjFileTopic, tenCorpDto.toString());
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new RuntimeException("产品入栈失败");
}
} catch (Exception e) {
e.printStackTrace();
PageData log = tenCorpDto.getPd();
log.put("MESSAGE_ERROR_LOG_ID", Warden.get32UUID());
log.put("ERROR_MESSAGE", e.getMessage());
log.put("TYPE", "0");
log.put("TIME", DateUtil.getTime());
mqErrorMessageLogMapper.save(log);
}
}
}