|
|
@@ -1,167 +1,167 @@
|
|
|
-//package cn.start.tz.module.pressure2.mq.consumer;
|
|
|
-//
|
|
|
-//import cn.start.tz.module.pressure2.api.taskorder.BoilerTaskOrderApi;
|
|
|
-//import cn.start.tz.module.pressure2.api.taskorder.dto.UpdatePayStatusDTO;
|
|
|
-//import cn.start.tz.module.pressure2.mq.message.PayChargeEventMessage;
|
|
|
-//import com.fasterxml.jackson.databind.JsonNode;
|
|
|
-//import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
-//import com.fasterxml.jackson.databind.node.ArrayNode;
|
|
|
-//import jakarta.annotation.Resource;
|
|
|
-//import lombok.extern.slf4j.Slf4j;
|
|
|
-//import org.apache.commons.lang3.StringUtils;
|
|
|
-//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
-//import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
-//import org.springframework.context.event.EventListener;
|
|
|
-//import org.springframework.stereotype.Component;
|
|
|
-//
|
|
|
-//@Component
|
|
|
-//@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
|
|
|
-// topic = "js_pay_service_charge_status",
|
|
|
-// consumerGroup = "js_pay_service_charge_status" + "_" + "${spring.profiles.active}"
|
|
|
-//)
|
|
|
-//
|
|
|
-//@Slf4j
|
|
|
-//public class PayChargeStatusSendConsumer implements RocketMQListener<PayChargeEventMessage> {
|
|
|
-//
|
|
|
-//
|
|
|
-// @Resource
|
|
|
-// private BoilerTaskOrderApi taskOrderApi;
|
|
|
-//
|
|
|
-// @Resource
|
|
|
-// private ObjectMapper objectMapper;
|
|
|
-//
|
|
|
-// /**
|
|
|
-// * 健壮的JSON解析方法,处理格式问题
|
|
|
-// */
|
|
|
-// private PayChargeEventMessage.TaskInfo[] parseTaskList(String taskListJson) {
|
|
|
-// if (StringUtils.isBlank(taskListJson)) {
|
|
|
-// return new PayChargeEventMessage.TaskInfo[0];
|
|
|
-// }
|
|
|
-//
|
|
|
-// try {
|
|
|
-// // 首先尝试直接解析
|
|
|
-// return objectMapper.readValue(taskListJson, PayChargeEventMessage.TaskInfo[].class);
|
|
|
-// } catch (Exception e) {
|
|
|
-// log.warn("直接JSON解析失败,尝试修复格式,错误: {}", e.getMessage());
|
|
|
-//
|
|
|
-// try {
|
|
|
-// // 尝试解析为JsonNode以便修复
|
|
|
-// JsonNode rootNode = objectMapper.readTree(taskListJson);
|
|
|
-//
|
|
|
-// if (rootNode.isArray()) {
|
|
|
-// // 如果是数组,直接转换
|
|
|
-// return objectMapper.convertValue(rootNode, PayChargeEventMessage.TaskInfo[].class);
|
|
|
-// } else if (rootNode.isObject()) {
|
|
|
-// // 如果是单个对象,包装成数组
|
|
|
-// ArrayNode arrayNode = objectMapper.createArrayNode();
|
|
|
-// arrayNode.add(rootNode);
|
|
|
-// return objectMapper.convertValue(arrayNode, PayChargeEventMessage.TaskInfo[].class);
|
|
|
-// } else {
|
|
|
-// log.error("无法解析的JSON格式,既不是数组也不是对象: {}", taskListJson);
|
|
|
-// return new PayChargeEventMessage.TaskInfo[0];
|
|
|
-// }
|
|
|
-// } catch (Exception e2) {
|
|
|
-// log.error("JSON修复失败,原始数据: {}", taskListJson, e2);
|
|
|
-// return new PayChargeEventMessage.TaskInfo[0];
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-//
|
|
|
-// @EventListener
|
|
|
-// // @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
|
|
|
-// @Override
|
|
|
-// public void onMessage(PayChargeEventMessage message) {
|
|
|
-// log.info("PayChargeStatusSendConsumer [onMessage][消息内容({})]", message);
|
|
|
-//
|
|
|
-// //费用事件:1.缴费 2.解除认领
|
|
|
-// if("1".equals(message.getEventType()+"")){
|
|
|
-// if(StringUtils.isNotBlank(message.getTaskList())){
|
|
|
-// // 使用健壮的解析方法
|
|
|
-// PayChargeEventMessage.TaskInfo[] taskInfos = parseTaskList(message.getTaskList());
|
|
|
-//
|
|
|
-// if (taskInfos.length == 0) {
|
|
|
-// log.warn("PayChargeStatusSendConsumer 解析到空的taskList数据");
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// log.info("PayChargeStatusSendConsumer 成功解析到{}条任务记录", taskInfos.length);
|
|
|
-//
|
|
|
-// for (PayChargeEventMessage.TaskInfo taskInfo : taskInfos) {
|
|
|
-// try {
|
|
|
-// if("4".equals(taskInfo.getInData())){
|
|
|
-// //承压
|
|
|
-// log.info("PayChargeStatusSendConsumer 承压支付回调 {} - 金额: {}",
|
|
|
-// taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
-// UpdatePayStatusDTO payInfo = new UpdatePayStatusDTO();
|
|
|
-// payInfo.setOrderNo(taskInfo.getTaskNo());
|
|
|
-// payInfo.setChargeTime(message.getChargeTime());
|
|
|
-// payInfo.setChargeAmount(taskInfo.getChargeAmount());
|
|
|
-//
|
|
|
-// taskOrderApi.updatePayStatus(payInfo);
|
|
|
-// } else if("5".equals(taskInfo.getInData())){
|
|
|
-// // 实验室
|
|
|
-// log.info("PayChargeStatusSendConsumer 实验室支付回调 {} - 金额: {}",
|
|
|
-// taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
-// } else {
|
|
|
-// log.warn("PayChargeStatusSendConsumer 未知业务类型 inData: {}, taskNo: {}",
|
|
|
-// taskInfo.getInData(), taskInfo.getTaskNo());
|
|
|
-// }
|
|
|
-// } catch (Exception e) {
|
|
|
-// log.error("PayChargeStatusSendConsumer 处理单条记录失败, taskNo: {}, 错误: {}",
|
|
|
-// taskInfo.getTaskNo(), e.getMessage(), e);
|
|
|
-// // 继续处理其他记录,不因为单条记录失败而中断
|
|
|
-// }
|
|
|
-// }
|
|
|
-// } else {
|
|
|
-// log.warn("PayChargeStatusSendConsumer taskList为空");
|
|
|
-// }
|
|
|
-// }else if("2".equals(message.getEventType()+"")){
|
|
|
-// if(StringUtils.isNotBlank(message.getTaskList())){
|
|
|
-// // 使用健壮的解析方法
|
|
|
-// PayChargeEventMessage.TaskInfo[] taskInfos = parseTaskList(message.getTaskList());
|
|
|
-//
|
|
|
-// if (taskInfos.length == 0) {
|
|
|
-// log.warn("PayChargeStatusSendConsumer 解析到空的taskList数据");
|
|
|
-// return;
|
|
|
-// }
|
|
|
-//
|
|
|
-// log.info("PayChargeStatusSendConsumer 成功解析到{}条任务记录", taskInfos.length);
|
|
|
-//
|
|
|
-// for (PayChargeEventMessage.TaskInfo taskInfo : taskInfos) {
|
|
|
-// try {
|
|
|
-// if("4".equals(taskInfo.getInData())){
|
|
|
-// //承压
|
|
|
-// log.info("PayChargeStatusSendConsumer 承压取消回调 {} - 金额: {}",
|
|
|
-// taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
-// UpdatePayStatusDTO payInfo = new UpdatePayStatusDTO();
|
|
|
-// payInfo.setOrderNo(taskInfo.getTaskNo());
|
|
|
-// payInfo.setChargeTime(message.getChargeTime());
|
|
|
-// payInfo.setChargeAmount(taskInfo.getChargeAmount());
|
|
|
-// payInfo.setIsCancel(true);
|
|
|
-//
|
|
|
-// taskOrderApi.updatePayStatus(payInfo);
|
|
|
-// } else if("5".equals(taskInfo.getInData())){
|
|
|
-// // 实验室
|
|
|
-// log.info("PayChargeStatusSendConsumer 实验室支付取消回调 {} - 金额: {}",
|
|
|
-// taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
-// } else {
|
|
|
-// log.warn("PayChargeStatusSendConsumer 未知业务类型 inData: {}, taskNo: {}",
|
|
|
-// taskInfo.getInData(), taskInfo.getTaskNo());
|
|
|
-// }
|
|
|
-// } catch (Exception e) {
|
|
|
-// log.error("PayChargeStatusSendConsumer 处理单条记录失败, taskNo: {}, 错误: {}",
|
|
|
-// taskInfo.getTaskNo(), e.getMessage(), e);
|
|
|
-// // 继续处理其他记录,不因为单条记录失败而中断
|
|
|
-// }
|
|
|
-// }
|
|
|
-// } else {
|
|
|
-// log.warn("PayChargeStatusSendConsumer taskList为空");
|
|
|
-// }
|
|
|
-// } else {
|
|
|
-// log.info("PayChargeStatusSendConsumer 忽略事件类型: {}", message.getEventType());
|
|
|
-// }
|
|
|
-//
|
|
|
-// }
|
|
|
-//
|
|
|
-//
|
|
|
-//}
|
|
|
+package cn.start.tz.module.pressure2.mq.consumer;
|
|
|
+
|
|
|
+import cn.start.tz.module.pressure2.api.taskorder.BoilerTaskOrderApi;
|
|
|
+import cn.start.tz.module.pressure2.api.taskorder.dto.UpdatePayStatusDTO;
|
|
|
+import cn.start.tz.module.pressure2.mq.message.PayChargeEventMessage;
|
|
|
+import com.fasterxml.jackson.databind.JsonNode;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.databind.node.ArrayNode;
|
|
|
+import jakarta.annotation.Resource;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
+import org.springframework.context.event.EventListener;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+@Component
|
|
|
+@RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
|
|
|
+ topic = "js_pay_service_charge_status",
|
|
|
+ consumerGroup = "js_pay_service_charge_status" + "_pressure2_" + "${spring.profiles.active}"
|
|
|
+)
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+public class PayChargeStatusSendConsumer implements RocketMQListener<PayChargeEventMessage> {
|
|
|
+
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private BoilerTaskOrderApi taskOrderApi;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ObjectMapper objectMapper;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 健壮的JSON解析方法,处理格式问题
|
|
|
+ */
|
|
|
+ private PayChargeEventMessage.TaskInfo[] parseTaskList(String taskListJson) {
|
|
|
+ if (StringUtils.isBlank(taskListJson)) {
|
|
|
+ return new PayChargeEventMessage.TaskInfo[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 首先尝试直接解析
|
|
|
+ return objectMapper.readValue(taskListJson, PayChargeEventMessage.TaskInfo[].class);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.warn("直接JSON解析失败,尝试修复格式,错误: {}", e.getMessage());
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 尝试解析为JsonNode以便修复
|
|
|
+ JsonNode rootNode = objectMapper.readTree(taskListJson);
|
|
|
+
|
|
|
+ if (rootNode.isArray()) {
|
|
|
+ // 如果是数组,直接转换
|
|
|
+ return objectMapper.convertValue(rootNode, PayChargeEventMessage.TaskInfo[].class);
|
|
|
+ } else if (rootNode.isObject()) {
|
|
|
+ // 如果是单个对象,包装成数组
|
|
|
+ ArrayNode arrayNode = objectMapper.createArrayNode();
|
|
|
+ arrayNode.add(rootNode);
|
|
|
+ return objectMapper.convertValue(arrayNode, PayChargeEventMessage.TaskInfo[].class);
|
|
|
+ } else {
|
|
|
+ log.error("无法解析的JSON格式,既不是数组也不是对象: {}", taskListJson);
|
|
|
+ return new PayChargeEventMessage.TaskInfo[0];
|
|
|
+ }
|
|
|
+ } catch (Exception e2) {
|
|
|
+ log.error("JSON修复失败,原始数据: {}", taskListJson, e2);
|
|
|
+ return new PayChargeEventMessage.TaskInfo[0];
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @EventListener
|
|
|
+ // @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
|
|
|
+ @Override
|
|
|
+ public void onMessage(PayChargeEventMessage message) {
|
|
|
+ log.info("PayChargeStatusSendConsumer [onMessage][消息内容({})]", message);
|
|
|
+
|
|
|
+ //费用事件:1.缴费 2.解除认领
|
|
|
+ if("1".equals(message.getEventType()+"")){
|
|
|
+ if(StringUtils.isNotBlank(message.getTaskList())){
|
|
|
+ // 使用健壮的解析方法
|
|
|
+ PayChargeEventMessage.TaskInfo[] taskInfos = parseTaskList(message.getTaskList());
|
|
|
+
|
|
|
+ if (taskInfos.length == 0) {
|
|
|
+ log.warn("PayChargeStatusSendConsumer 解析到空的taskList数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("PayChargeStatusSendConsumer 成功解析到{}条任务记录", taskInfos.length);
|
|
|
+
|
|
|
+ for (PayChargeEventMessage.TaskInfo taskInfo : taskInfos) {
|
|
|
+ try {
|
|
|
+ if("4".equals(taskInfo.getInData())){
|
|
|
+ //承压
|
|
|
+ log.info("PayChargeStatusSendConsumer 承压支付回调 {} - 金额: {}",
|
|
|
+ taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
+ UpdatePayStatusDTO payInfo = new UpdatePayStatusDTO();
|
|
|
+ payInfo.setOrderNo(taskInfo.getTaskNo());
|
|
|
+ payInfo.setChargeTime(message.getChargeTime());
|
|
|
+ payInfo.setChargeAmount(taskInfo.getChargeAmount());
|
|
|
+
|
|
|
+ taskOrderApi.updatePayStatus(payInfo);
|
|
|
+ } else if("5".equals(taskInfo.getInData())){
|
|
|
+ // 实验室
|
|
|
+ log.info("PayChargeStatusSendConsumer 实验室支付回调 {} - 金额: {}",
|
|
|
+ taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
+ } else {
|
|
|
+ log.warn("PayChargeStatusSendConsumer 未知业务类型 inData: {}, taskNo: {}",
|
|
|
+ taskInfo.getInData(), taskInfo.getTaskNo());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("PayChargeStatusSendConsumer 处理单条记录失败, taskNo: {}, 错误: {}",
|
|
|
+ taskInfo.getTaskNo(), e.getMessage(), e);
|
|
|
+ // 继续处理其他记录,不因为单条记录失败而中断
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("PayChargeStatusSendConsumer taskList为空");
|
|
|
+ }
|
|
|
+ }else if("2".equals(message.getEventType()+"")){
|
|
|
+ if(StringUtils.isNotBlank(message.getTaskList())){
|
|
|
+ // 使用健壮的解析方法
|
|
|
+ PayChargeEventMessage.TaskInfo[] taskInfos = parseTaskList(message.getTaskList());
|
|
|
+
|
|
|
+ if (taskInfos.length == 0) {
|
|
|
+ log.warn("PayChargeStatusSendConsumer 解析到空的taskList数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("PayChargeStatusSendConsumer 成功解析到{}条任务记录", taskInfos.length);
|
|
|
+
|
|
|
+ for (PayChargeEventMessage.TaskInfo taskInfo : taskInfos) {
|
|
|
+ try {
|
|
|
+ if("4".equals(taskInfo.getInData())){
|
|
|
+ //承压
|
|
|
+ log.info("PayChargeStatusSendConsumer 承压取消回调 {} - 金额: {}",
|
|
|
+ taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
+ UpdatePayStatusDTO payInfo = new UpdatePayStatusDTO();
|
|
|
+ payInfo.setOrderNo(taskInfo.getTaskNo());
|
|
|
+ payInfo.setChargeTime(message.getChargeTime());
|
|
|
+ payInfo.setChargeAmount(taskInfo.getChargeAmount());
|
|
|
+ payInfo.setIsCancel(true);
|
|
|
+
|
|
|
+ taskOrderApi.updatePayStatus(payInfo);
|
|
|
+ } else if("5".equals(taskInfo.getInData())){
|
|
|
+ // 实验室
|
|
|
+ log.info("PayChargeStatusSendConsumer 实验室支付取消回调 {} - 金额: {}",
|
|
|
+ taskInfo.getTaskNo(), taskInfo.getChargeAmount());
|
|
|
+ } else {
|
|
|
+ log.warn("PayChargeStatusSendConsumer 未知业务类型 inData: {}, taskNo: {}",
|
|
|
+ taskInfo.getInData(), taskInfo.getTaskNo());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("PayChargeStatusSendConsumer 处理单条记录失败, taskNo: {}, 错误: {}",
|
|
|
+ taskInfo.getTaskNo(), e.getMessage(), e);
|
|
|
+ // 继续处理其他记录,不因为单条记录失败而中断
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.warn("PayChargeStatusSendConsumer taskList为空");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.info("PayChargeStatusSendConsumer 忽略事件类型: {}", message.getEventType());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|