消息队列(RocketMQ):事务消息实现以及最终一致性
最近项目中有一个高并发的更新数据库单表单记录的功能,为了避免数据库压力,采取了更新该记录时候发送mq,进行流量消峰。但是该记录的更新要和其他表更新一起,就会有分布式事务的问题。所以就采用了RocketMQ的事务消息来实现分布式事务的一致性。
RocketMQ
前文中的阿里的rocketMq集成的ons框架配置以及普通、延时、定时消息实现的文章
一、事务消息类型
RockectMQ事务消息提供了X/Open XA的分布式事务的功能,能实现分布式事务的最终一致性。
二、 X/Open XA规范是什么?
X/OPEN
是一个组织,即现在的THE OPEN GROUP,它定义了分布式事务处理模型,其中X/OPEN DTP是该组织定义的一种分布式模型,其包含3中角色:
- AP(应用程序)
- RM(资源管理库)
- TM(事务协调者)
XA规范流程:
XA一种分布式事务的协议,核心思想是2段式提交。
成功流程:
- RM发送任务给各个TM,但是各个TM没有提交(预提交)
- 各个TM都可以处理,则RM通知所有TM执行事务提交
- 各TM都成功后, 则RM可以删除该次分布式事务的所有数据
失败流程:
- RM发送任务给各个TM,但是各个TM没有提交
- 一个TM不能处理,其他TM都可以处理。(预处理阶段)
- RM通知所有TM执行事务回滚
事务补偿机制(RocketMQ则采取这种形式):
- RM发送任务给各个TM,但是各个TM没有提交(预提交)
- 各个TM都可以处理,则RM通知所有TM执行事务提交
- 其中有个TM执行失败,因为RM记录了所有TM执行的数据,则只需要补偿失败的数据。
三、事务消息如何实现最终一致性方案?
RocketMQ是一种最终一致性的分布式事务,就是说它保证的是消息最终一致性,而不是像2PC、3PC、TCC那样强一致分布式事务。
RocketMQ交互流程
举例如下:
用户A在商户B支付了一笔金额C,则用户A账户余额应该减少金额c,而商户B应该增加金额C。由于用户服务和商户服务拆分了,所以这一次支付应该保证用户、商户数据要么全部修改,要不然全部不修改
名词解释:
- 半事务消息:是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它。
- 消息回查
由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查。
RocketMQ分布式事务流程:
- 用户端后台发送一条更新商户B余额的半事务消息至MQ服务端
- MQ服务端收到则会返回Success至用户端
- 用户端收到Success,则会去执行更新用户端余额的事务
- 执行结束后会根据本地事务执行结果返回状态Commint或rollback给MQ服务器端(如果MQ端长时间没有接收到用户端事务状态,则会去调用用户端检查服务,判断当前用户端事务是否成功)
- MQ端接受Commit则将该消息修改成可投递状态,商户端会去消费,并且去执行对应的修改余额的事务。如果是RollBack则不投递消息,存储三天后删除
为什么是最终一致性?
从流程中可以看到,RocketMQ只关注发送端事务成功发送消息,不关注消息消费方事务状态,因为他采取了补偿机制的XA规范,在消费端有多次尝试消费,如果还未成功则记录异常日志由人工处理。
发送事务消息为什么必须要实现回查 Check 机制?
当发送端发送半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方 Check 该 Half 状态消息,并上报其最终状态。
Check 被回调时,业务逻辑都需要做些什么?
事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列 RocketMQ 版发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求,消息 队列 RocketMQ 版发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求。因此在事务消息的 Check 方法中,需要完成两件事情:
- 检查该半事务消息对应的本地事务的状态(committed or rollback)。
- 向 Broker 提交该半事务消息本地事务的状态。
四、代码实现
阿里MQ官方文档
发送端
controller
@Autowired
private AlMqComponent alMqComponent;
@GetMapping("testRocketMq")
public String testRocketMq() throws Exception{
alMqComponent.pushTransactionByTcp(UUID.randomUUID().toString(), JSON.toJSONBytes(pointGovInfo));
}
AlMqComponent
package com.cx.point.component;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import com.cx.point.config.AliMqProduceConfig;
import com.cx.point.config.HashUtil;
import com.cx.point.service.TestService;
import com.xz.log.utils.LogTrace;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
public class AlMqComponent {
@Autowired
private AliMqProduceConfig aliMqConfig;
@Autowired
private TestService testService;
@Value("${aliyun.mq.tcp.transaction.cx_point.topic}")
private String transactionTopic;
@Value("${aliyun.mq.tcp.transaction.cx_point.tag}")
private String transactionTag;
/**
* 发送mq消息
*/
public String pushTransactionByTcp(String key, byte[] body) throws Exception {
try {
TransactionProducer producer = aliMqConfig.getTransactionProducer();
LogTrace.info("事务消息","事务发送体", JSON.toJSONString(producer));
Message msg = new Message(transactionTopic, transactionTag, body);
msg.setKey(key);
SendResult sendResult = producer.send(msg, (msg1, arg) -> {
// 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息属于半事务消息,所以消息 ID 在消息队列 RocketMQ 版控制台无法查询)。
String msgId = msg1.getMsgID();
LogTrace.info("事务消息","事务消息执行id",msgId);
// 消息体内容进行 crc32,也可以使用其它的如 MD5。
long crc32Id = HashUtil.crc32Code(msg1.getBody());
// 消息 ID 和 crc32id 主要是用来防止消息重复。
// 如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等。
// 如果要求消息绝对不重复,推荐做法是对消息体使用 crc32 或 MD5 来防止重复消息。
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
//上文步骤3,发送端执行发送端事务
boolean isCommit =
testService.execbusinessService();
LogTrace.info("事务消息","事务消息执行结果",String.valueOf(isCommit));
if (isCommit) {
// 本地事务已成功则提交消息。
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// 本地事务已失败则回滚消息。
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
LogTrace.error("事务mq", "Message Id:{}", msgId, e);
}
System.out.println(msg1.getMsgID());
LogTrace.info("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
return transactionStatus;
}, null);
return sendResult.getMessageId();
} catch (Exception e) {
LogTrace.error("pushPlatformCxPointByTcp:" + key, e);
throw e;
}
}
}
aliMqConfig
@Configuration
public class AliMqProduceConfig {
@Value("${rocketmq.nameSrvAddr}")
private String tcpNameSrvAddr;
@Value("${rocketmq.secretKey}")
private String tcpMqSecretKey;
@Value("${rocketmq.accessKey}")
private String tcpMqAccessKey;
@Value("${aliyun.mq.tcp.transaction.group_id}")
private String transGroupId;
@Autowired
TestService checkbusinessService;
/**
* 事务MQ Bean
*
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public TransactionProducer getTransactionProducer() {
//final BusinessService businessService = new BusinessService();
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, transGroupId);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, tcpMqAccessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, tcpMqSecretKey);
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, tcpNameSrvAddr);
return ONSFactory.createTransactionProducer(properties,
msg -> {
//消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息属于半事务消息,所以消息 ID 在消息队列 RocketMQ 版控制台无法查询)。
String msgId = msg.getMsgID();
LogTrace.info("事务消息","检查事务状态id",msgId);
//消息体内容进行 crc32,也可以使用其它的方法如 MD5。
long crc32Id = HashUtil.crc32Code(msg.getBody());
//消息 ID 和 crc32Id 主要是用来防止消息重复。
//如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等。
//如果要求消息绝对不重复,推荐做法是对消息体使用 crc32 或 MD5 来防止重复消息。
//业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理。
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
//执行步骤4中查询本地事务状态
boolean isCommit = checkbusinessService.checkbusinessService();
LogTrace.info("事务消息","检查事务状态", String.valueOf(isCommit));
if (isCommit) {
//本地事务已成功则提交消息。
transactionStatus = TransactionStatus.CommitTransaction;
} else {
//本地事务已失败则回滚消息。
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
LogTrace.error("事务mq","Message Id:{}", msgId, e);
}
LogTrace.info("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
return transactionStatus;
});
}
}
TestService
/**
* 生产者执行逻辑服务
*/
@Transactional
public boolean execbusinessService(){
pointGovInfoMapper.insert(new PointGovInfo().setGovName("zzzzz").setDelFlag(ConstantKit.NOMAL)
.setAreaId(1)
.setCityId(1)
.setCreateTime(new Date())
.setGovType(1)
.setPhoneNum("govAccountRequest.getPhoneNum()")
.setProvinceId(1)
.setPhoneNum("111")
.setSplitRadio(BigDecimal.ZERO));
return true;
}
/**
* 生产者检查逻辑服务
*/
public boolean checkbusinessService(){
PointGovInfo pointGovInfo=pointGovInfoMapper.selectByEntity(new PointGovInfo().setGovName("xxxxx"));
return pointGovInfo!=null;
}
/**
* 消费者执行逻辑服务
*/
@Transactional
public boolean consumeTransactionService(PointGovInfo pointGovInfo){
pointGovInfoMapper.insert(pointGovInfo);
throw new RuntimeException();
// return true;
}
消费者
AliMqConsumerConfig
/**
* 消费者
*
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
@ConditionalOnBean(name = {"testService"})
public ConsumerBean getMerchantDearPowderConsumer(TestService testService) {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = new Properties();
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, tcpMqAccessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, tcpMqSecretKey);
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, tcpNameSrvAddr);
properties.put(PropertyKeyConst.GROUP_ID, transGroupId);
consumerBean.setProperties(properties);
//组装订阅者消息
Map<Subscription, MessageListener> map = new HashMap<Subscription, MessageListener>();
Subscription consumeProduce = new Subscription();
consumeProduce.setTopic(transactionTopic);
map.put(consumeProduce, new MqTransactionListener(testService));
consumerBean.setSubscriptionTable(map);
return consumerBean;
}
MqTransactionListener
public class MqTransactionListener implements MessageListener {
private TestService testService;
public MqTransactionListener(TestService testService){
this.testService=testService;
}
@Override
public Action consume(Message message, ConsumeContext context) {
LogTrace.beginTrace();
try {
String msg = new String(message.getBody(), "UTF-8");
LogTrace.info("接收到事务消息", "msg", JSON.toJSONString(message));
if(StringUtils.isNotBlank(msg)){
PointGovInfo pointGovInfo = JSON.parseObject(msg, PointGovInfo.class);
boolean is=testService.consumeTransactionService(pointGovInfo);
LogTrace.info("接收到事务消息:","success","success");
//消费成功
return Action.CommitMessage;
}
//消费成功
return Action.CommitMessage;
} catch (Exception e){
//事务异常则返回,并且记录日志
LogTrace.error("接收到事务消息-异常",e);
return Action.ReconsumeLater;
}finally {
LogTrace.endTrace();
}
}
}