RocketMq-半消息(十)
概念:
半消息: 在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知rocketmq发送
操作步骤 :
1.(生产者)发送-【半消息】
2.(生产者)本地监听-【半消息】处理结果
3.(消费者)处理-【半消息】
1.(生产者)发送-【半消息】
10年的宣化网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。成都全网营销推广的优势是能够根据用户设备显示端的尺寸不同,自动调整宣化建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。成都创新互联公司从事“宣化网站设计”,“宣化网站推广”以来,每个客户项目都认真落实执行。
// 消息体
@Data
@Builder
@ToString
public class UserMoneyParams {
int userId;
String act;
double money;
String info;
String infoParams;
}
// 发送消息
// 发送-队列半消息: rocketMQ
@RequestMapping("rocketMQHalf")
public ApiResult rocketMQHalf() {
int orderId = 2;
double money = 10;
// 用户余额变更-参数体
UserMoneyParams userMoneyParams = UserMoneyParams.builder()
.act("pay-order")
.userId(orderId)
.money(money)
.build();
// 用户数据变更-参数
UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1);
log.info("发送前参数: "+userMoneyParams.toString());
rocketMQTemplate.sendMessageInTransaction(
// 半消息-分组
"tsca-group-half",
// 半消息-topic
"member-change-money-half-topic",
// 半消息-数据体
MessageBuilder
.withPayload(userMoneyParams)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
.build(),
userOrder
);
return ApiResult.success("发送队列-半消息");
}
2.(生产者)本地监听-【半消息】处理结果
@RocketMQTransactionListener(txProducerGroup = "tsca-group-half")
@RequiredArgsConstructor
@Slf4j
public class UserMoneyHalfListener implements RocketMQLocalTransactionListener {
@Autowired
redisUtil redisUtil;
@Autowired
UserOrderService userOrderService;
// 生产者-消息处理完毕,继续执行本地方法(含事务)
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
try {
Object userMoneyParams=message.getPayload();
log.info("消息-args:"+arg);
// 消息主体加密无法获取
log.info("消息-主体:"+ JSON.toJSONString(userMoneyParams));
log.info("消息-主体-头部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID));
log.info("半消息-本地-处理完成");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.warn("半消息-本地-发生异常,回滚: "+e.getMessage());
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 生产者-消息处理超时
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// 查询消息是否已经处理
String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id"));
Object messageData = this.redisUtil.getValue(messageID, String.class);
if (messageData != null && messageData.equals("ok")) {
// 超时且消息已经处理完毕
log.info("半消息-本地消息超时-且已经处理完毕");
return RocketMQLocalTransactionState.COMMIT;
} else {
log.info("半消息-本地消息超时-且未处理完毕");
// 超时且消息未处理完毕
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
3.(消费者)处理-【半消息】
@Service
@RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic")
@Slf4j
public class UserMoneyHalfListener implements RocketMQListener {
// @Autowired
// UserMoneyService memberOrderService;
@Override
public void onMessage(UserMoneyParams memberMoneyMessage) {
log.info("收到-用户余额变动-半消息");
try {
} catch (Exception e) {
log.info("更改余额错误: "+e.getMessage());
e.printStackTrace();
}
log.info(JSON.toJSONString(memberMoneyMessage));
}
}
名称栏目:RocketMq-半消息(十)
文章网址:http://scyanting.com/article/jpjoep.html