# 二阶段消息
# 概述
本文提出的二阶段消息,可以完美替代现有的事务消息或本地消息表架构。
下面我们以跨行转账作为例子,给大家详解这种新架构。业务场景介绍如下:
我们需要跨行从 A 转给 B 30 元,我们先进行可能失败的转出操作 TransOut,即进行 A 扣减 30 元。如果 A 因余额不足扣减失败,那么转账直接失败,返回错误;如果扣减成功,那么进行下一步转入操作,因为转入操作没有余额不足的问题,可以假定转入操作一定会成功。
# HTTP 接入
二阶段消息完成上述任务的核心代码如下所示:
msg := dtmcli.NewMsg(DtmServer, gid). | |
Add(busi.Busi+"/TransIn", &TransReq{Amount: 30}) | |
err := msg.DoAndSubmitDB(busi.Busi+"/QueryPreparedB", db, func(tx *sql.Tx) error { | |
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") | |
}) |
这部分代码中
- 首先生成一个 DTM 的 msg 全局事务,传递 dtm 的服务器地址和全局事务 id
- 给 msg 添加一个分支业务逻辑,这里的业务逻辑为余额转入操作 TransIn,然后带上这个服务需要传递的数据,金额 30 元
- 然后调用 msg 的 DoAndSubmitDB,这个函数保证业务成功执行和 msg 全局事务提交,要么同时成功,要么同时失败
- 第一个参数为回查 URL,详细含义稍后说
- 第二个参数为 sql.DB,是业务访问的数据库对象
- 第三个参数是业务函数,我们这个例子中的业务是给 A 扣减 30 元余额
# 成功流程
DoAndSubmitDB 是如何保证业务成功执行与 msg 提交的原子性的呢?请看如下的时序图:
一般情况下,时序图中的 5 个步骤会正常完成,整个业务按照预期进行,全局事务完成。这里面有个新的内容需要解释一下,就是 msg 的提交是按照两个阶段发起的,第一阶段调用 Prepare,第二阶段调用 Commit,DTM 收到 Prepare 调用后,不会调用分支事务,而是等待后续的 Submit。只有收到了 Submit,开始分支调用,最终完成全局事务。
# 提交后宕机流程
在分布式系统中,各类的宕机和网络异常都是需要考虑的,下面我们来看看可能发生的问题:
首先我们要达到的最重要目标是业务成功执行和 msg 事务是原子操作,因此首先看如果在业务完成提交后,发送 Submit 消息前出现了宕机故障会怎么样,新架构如何保证原子性?
我们来看看这种情况下的时序图:
如果在 dtm 收到 Prepare 调用后,AP 在事务提交前,遇见故障宕机,那么数据库会检测到 AP 的连接断开,自动回滚本地事务。
后续 dtm 轮询取出已经超时的,只 Prepare 但没有 Submit 的全局事务,进行回查。回查服务发现本地事务已回滚,返回结果给 dtm。dtm 收到已回滚的结果后,将全局事务标记为失败,并结束该全局事务。
# .vs 本地消息表
上述的问题也可以采用本地消息表方案(方案详情参考分布式事务最经典的七种解决方案),来保证数据的最终一致性。如果采用本地消息表,需要的工作包括:
- 在本地事务中执行本地业务逻辑,将消息插入消息表并最后提交
- 编写轮询任务,将本地消息表的消息,发给消息队列
- 消费消息,并将消息发给相应的处理服务
# .vs 事务消息
上述的问题也可以采用 RocketMQ 的事务消息方案(方案详情参考分布式事务最经典的七种解决方案),来保证数据的最终一致性。如果采用事务消息,需要的工作包括:
- 发送半消息,开启本地事务,提交本地事务,发送 commit 消息到 RocketMQ
- 消费超时的半消息,对于收到的超时半消息,查询本地数据库,然后进行 Commit/Rollback
- 消费已提交的消息,并将消息发送给处理服务
# 更多的优点
对比于前面讲述的队列方案,二阶段消息还有很多额外的优点:
- 二阶段消息整个暴露的接口,完全与队列无关,只跟实际的业务和服务调用相关,对开发人员更加友好
- 二阶段消息不用考虑消息队列消息堆积及其他故障等问题,因为二阶段消息只依赖 dtm,开发人员可以认为 dtm 与系统中其他一个普通无状态服务一样,只依赖背后的存储 Mysql/Redis。
- 消息队列是异步的,而二阶段消息同时支持异步和同步,默认异步,只需要打开 msg.WaitResult=true,那么可以同步等待下游服务完成
- 二阶段消息还支持同时指定多个下游服务
# 二阶段消息的应用
二阶段消息能够大幅降低消息最终一致性解决方案的难度,已获得广泛的应用,下面是两个典型的应用。
- 秒杀系统:该架构下单机可以轻松扛住上万个订单请求,并且保证库存数量和订单数量准确匹配
- 缓存一致性:通过二阶段消息,可以轻松保证 DB 与缓存的一致性,大大优于队列或订阅 binlog 的方案
# 回查原理剖析
前面的时序图中,以及接口中都出现了回查服务,在二阶段消息中,是复制粘贴代码自动处理的,而 RocketMQ 的事务消息,则是手动处理的。那么自动处理的原理是什么?
要进行回查,首先要在业务数据库实例中,建立一张独立的表,里面保存全局事务 id。在处理业务事务时,会把 gid 写入到这张表。
当我们用 gid 回查时,如果能够在表中查到 gid,那么说明本地事务已提交,这样就可以返回 dtm,告知本地事务已提交。
当我们用 gid 回查时,没有在表中查到 gid,那么说明本地事务未提交,此时可能的结果是两个,一是事务还在进行中,二是事务已回滚。我查了许多关于 RocketMQ 的资料,未找到有效的解决方案。搜到所有解决方案是,如果未查到结果,那么什么都不做,等待下一次回查,如果 2 分钟或者更久的回查,一直都是查不到的,那么认为本地事务已回滚。
上述这种方案有很大的问题:
- 两分钟还查不到 gid,并不能认为本地事务已回滚,极端情况下,可能发生数据库故障(例如进程或磁盘卡住了),持续时间超过 2 分钟,最后数据又提交了,那么这个时候,数据就不是最终一致了,就需要人工介入处理了
- 如果一个本地事务,已经回滚了,但是回查操作,还会在两分钟之内,按照 10s 左右的时间间隔,不断的进行轮询,会给服务器造成不必要的压力
而 dtm 的二阶段消息方案,则彻底解决了这部分的问题。dtm 的二阶段消息工作过程如下:
- 在处理本地事务时,会将 gid 插入到 dtm_barrier.barrier 表中,同时带上插入原因为 committed。该表有一个唯一索引,主要字段为 gid。
- 当进行回查时,二阶段消息的操作不是直接查 gid 是否存在,而是再 insert ignore 一条带有相同 gid 的数据,同时带上插入原因为 rollbacked。此时如果表中如果已有 gid 的记录,那么新的插入操作就会被 ignore,否则数据会被插入。
- 然后再用 gid 查询表中的记录,如果查到记录的 reason 为 committed,那么说明本地事务已提交;如果查到记录的 reason 为 rollbacked,那么说明本地事务已回滚。
那么对比 RocketMQ 回查时的常见方案,二阶段消息是如何区分出进行中和已回滚呢?其中的技巧在于回查时插入的数据,如果回查时,数据库的事务还在进行中,那么插入操作就会被进行中的事务阻塞,因为插入操作会等待事务中持有的锁。如果插入操作正常返回,那么数据库中的本地事务,必定已结束,必然是已提交或已回滚。
# 普通消息
二阶段消息不仅可以替换本地消息表方案,也能够替换普通消息方案。如果直接调用 Submit,那么就与普通消息方案近似,但是提供了更灵活简单的接口。
假设一个这样的应用场景,界面上有一个参加活动的按钮,如果参加活动,会赠与两本电子书的永久权限。这种情况下,可以再这个按钮的服务端中,类似这样处理:
msg := dtmcli.NewMsg(DtmServer, gid). | |
Add(busi.Busi+"/AuthBook", &Req{UID: 1, BookID: 5}). | |
Add(busi.Busi+"/AuthBook", &Req{UID: 1, BookID: 6}) | |
err := msg.Submit() |
这种方式也提供了异步接口,而不用依赖消息消息队列。在微服务的许多场景中,可以替换原有的异步消息架构。