lms框架的分布式事务解决方案采用的TCC事务模型。在开发过程中参考和借鉴了hmily。使用AOP的编程思想,在rpc通信过程中通过拦截器的方式对全局事务或是分支事务进行管理和协调。
本文通过lms.samples 订单接口给大家介绍lms框架分布式事务的基本使用。
lms分布式事务的使用
在lms框架中,在应用服务接口通过[Transaction]
特性标识该接口是一个分布式事务接口(应用接口层需要安装包Silky.Lms.Transaction
)。应用服务接口的实现必须需要通过 [TccTransaction(ConfirmMethod = "ConfirmMethod", CancelMethod = "CancelMethod")]
特性指定Confirm阶段和Cancel阶段的方法(需要再应用层安装包Silky.Lms.Transaction.Tcc
)。
在一个分布式事务处理过程中,会存在如下两种角色的事务。
事务角色
- 全局事务
在Lms框架中,第一个执行的事务被认为是全局事务(事务角色为TransactionRole.Start
)。换句话说,在一个业务处理过程中,执行的第一个被标识为TccTransaction
(应用接口需要被标识为Transaction
)的方法为全局事务。
当然,全局事务也作为事务的一特殊的事务参与者,在全局事务开始后,作为事务参与者注册到事务上下文中。
- 分支事务
在开始的一个分布式事务中,参与rpc通信,且被特性[Transaction]
标识的应用服务,被认为是分支事务(事务角色为:TransactionRole.Participant
)。
事务的执行
在开启一个全局事务之后,在全局事务的
try
过程中,首先将全局事务作为一个事务参与者添加到事务上下文中。如果遇到一个分支事务,那么首先会调用分支事务的try
方法。如果try
方法执行成功,那么分支事务作为一个事务参与者被注册到事务上下文中,并且分支的事务状态为变更为trying
。如果在全局事务的try方法执行过程中发生异常,那么全局事务的
Cancel
方法和被加入事务上下文且状态为trying
的分支事务参与者的Cancel
方法将会被调用,在Cancel
方法中实现数据回滚。也就是说,全局事务的Cancel
不管try
方法是否执行成功,全局事务的Cancel
方法都会被执行。分支事务只有被加入到事务上下文,且状态为trying
(分支事务已经执行过try
方法),那么分支事务的Cancel
方法才会被执行。全局事务的try方法执行成功,那么全局事务的
Confirm
和各个分支事务的Confirm
方法将会得到执行。换句话说,所有全局事务(事务主分支)以及分支事务的try方法都执行成功,才会依次执行所有事务参与者的
Confirm
方法,如果分布式事务的try
阶段执行失败,那么主分支事务的Cancel
方法一定会被调用;而分支事务看是否有被添加到事务上下文中且已经执行成功try
阶段的方法,只有这样的分支事务才会调用Cancel
方法。如果分支事务存在分支事务的情况下,这种业务场景会相对特殊,这个时候的分支事务相对于它的分支事务就是一个特殊的全局事务。它会在特殊的
try
阶段执行孙子辈的分支事务的try
和confirm
(成功)或是try
和cancel
(失败)。并且会将执行成功与否返回给父分支事务(全局事务)。
分布式事务案例-- lms.samples订单接口
下面,我们通过lms.samples的订单接口来熟悉通过lms框架如何实现分布式事务。
lms.samples 订单接口的业务流程介绍
在上一篇博文通过lms.samples熟悉lms微服务框架的使用,给大家介绍了lms.samples样例项目的基本情况。本文通过大家熟悉的一个订单接口,熟悉lms的分布式事务是如何使用。
下面,给大家梳理一下订单接口的业务流程。
判断和锁定订单产品库存: 在下订单之前需要判断是否存在相应的产品,产品的剩余数量是否足够,如果产品数量足够的话,扣减产品库存,锁定订单的库存数量(分支事务)
创建一个订单记录,订单状态为NoPay(全局事务)
判断用户的账号是否存在,账户余额是否充足,如果账户余额充足的话,则需要锁定订单金额,创建一个账户流水记录。
如果1,2,3都成功,释放产品锁定的订单库存
如果1,2,3都成功,释放账号锁定的金额,修改账号流水记录相关状态
如果1,2,3都成功,修改订单状态为Payed
如果在步骤1就出现异常(例如:产品的库存不足或是rpc通信失败,或是访问数据库出现异常等),库存分支事务(
DeductStockCancel
)和账号分支事务(DeductBalanceCancel
)指定的Cancel
方法都不会被执行。但是全局事务指定的Cancel
方法(OrderCreateCancel
)会被调用如果在步骤2就出现异常(下订单访问数据库出现异常),库存分支事务指定的
Cancel
方法(DeductStockCancel
)以及全局事务指定的Cancel
方法(OrderCreateCancel
)会被调用,账号分支事务指定(DeductBalanceCancel
)的Cancel
方法都不会被执行。如果在步骤3就出现异常(用户的账号余额不足,访问数据库出现异常等),那么库存分支事务(
DeductStockCancel
)和账号分支事务指定(DeductBalanceCancel
)全局事务指定的Cancel
方法(OrderCreateCancel
)都会被调用。
全局事务--订单接口
通过lms分布式事务的使用节点的介绍,我们知道在服务之间的rpc通信调用中,执行的第一个被标识为Transaction
的应用方法即为全局事务(即:事务的开始)。
首先, 我们需要在订单应用接口中通过[Transaction]
来标识这是一个分布式事务的应用接口。
[Transaction]
Task<GetOrderOutput> Create(CreateOrderInput input);
其次,在应用接口的实现通过[TccTransaction]
特性指定ConfirmMethod
方法和CancelMethod
。
- 指定的
ConfirmMethod
和CancelMethod
必须为public
类型,但是不需要在应用接口中声明。 - 全局事务的
ConfirmMethod
和CancelMethod
必定有一个会被执行,如果try方法(Create
)执行成功,那么执行ConfirmMethod
方法,执行失败,那么则会执行CancelMethod
。 - 可以将
try
、confirm
、cancel
阶段的方法放到领域服务中实现。 - 全局事务可以通过
RpcContext
的Attachments
向分支事务或是confirm
、cancel
阶段的方法传递Attachment参数。但是分支事务不能够通过RpcContext
的Attachments
向全局事务传递Attachment参数。
/// <summary>
/// try阶段的方法
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
[TccTransaction(ConfirmMethod = "OrderCreateConfirm", CancelMethod = "OrderCreateCancel")]
public async Task<GetOrderOutput> Create(CreateOrderInput input)
{
return await _orderDomainService.Create(input); //具体的业务放到领域层实现
}
// confirm阶段的方法
public async Task<GetOrderOutput> OrderCreateConfirm(CreateOrderInput input)
{
var orderId = RpcContext.GetContext().GetAttachment("orderId");
var order = await _orderDomainService.GetById(orderId.To<long>());
order.Status = OrderStatus.Payed;
order = await _orderDomainService.Update(order);
return order.MapTo<GetOrderOutput>();
}
// cancel阶段的方法
public async Task OrderCreateCancel(CreateOrderInput input)
{
var orderId = RpcContext.GetContext().GetAttachment("orderId");
// 如果不为空证明已经创建了订单
if (orderId != null)
{
// 是否保留订单可以根据具体的业务来确定。
// await _orderDomainService.Delete(orderId.To<long>());
var order = await _orderDomainService.GetById(orderId.To<long>());
order.Status = OrderStatus.UnPay;
await _orderDomainService.Update(order);
}
}
下订单的具体业务(订单try阶段的实现)
public async Task<GetOrderOutput> Create(CreateOrderInput input)
{
// 扣减库存
var product = await _productAppService.DeductStock(new DeductStockInput()
{
Quantity = input.Quantity,
ProductId = input.ProductId
}); // rpc调用,DeductStock被特性[Transaction]标记,是一个分支事务
// 创建订单
var order = input.MapTo<Domain.Orders.Order>();
order.Amount = product.UnitPrice * input.Quantity;
order = await Create(order);
RpcContext.GetContext().SetAttachment("orderId", order.Id); //分支事务或是主分支事务的confirm或是cancel阶段可以从RpcContext获取到Attachment参数。
//扣减账户余额
var deductBalanceInput = new DeductBalanceInput()
{OrderId = order.Id, AccountId = input.AccountId, OrderBalance = order.Amount};
var orderBalanceId = await _accountAppService.DeductBalance(deductBalanceInput); // rpc调用,DeductStock被特性[Transaction]标记,是一个分支事务
if (orderBalanceId.HasValue)
{
RpcContext.GetContext().SetAttachment("orderBalanceId", orderBalanceId.Value);//分支事务或是主分支事务的confirm或是cancel阶段可以从RpcContext获取到Attachment参数。
}
return order.MapTo<GetOrderOutput>();
}
分支事务--扣减库存
首先,需要在应用接口层标识这个是一个分布式事务接口。
// 标识这个是一个分布式事务接口
[Transaction]
// 执行成功,清除缓存数据
[RemoveCachingIntercept("GetProductOutput","Product:Id:{0}")]
// 该接口不对集群外部发布
[Governance(ProhibitExtranet = true)]
Task<GetProductOutput> DeductStock(DeductStockInput input);
其次,应用接口的实现指定Confirm
阶段和Cancel
阶段的方法。
[TccTransaction(ConfirmMethod = "DeductStockConfirm", CancelMethod = "DeductStockCancel")]
public async Task<GetProductOutput> DeductStock(DeductStockInput input)
{
var product = await _productDomainService.GetById(input.ProductId);
if (input.Quantity > product.Stock)
{
throw new BusinessException("订单数量超过库存数量,无法完成订单");
}
product.LockStock += input.Quantity;
product.Stock -= input.Quantity;
product = await _productDomainService.Update(product);
return product.MapTo<GetProductOutput>();
}
public async Task<GetProductOutput> DeductStockConfirm(DeductStockInput input)
{
//Confirm阶段的具体业务放在领域层实现
var product = await _productDomainService.DeductStockConfirm(input);
return product.MapTo<GetProductOutput>();
}
public Task DeductStockCancel(DeductStockInput input)
{
//Cancel阶段的具体业务放在领域层实现
return _productDomainService.DeductStockCancel(input);
}
分支事务--扣减账户余额
首先,需要在应用接口层标识这个是一个分布式事务接口。
[Governance(ProhibitExtranet = true)]
[RemoveCachingIntercept("GetAccountOutput","Account:Id:{0}")]
[Transaction]
Task<long?> DeductBalance(DeductBalanceInput input);
其次,应用接口的实现指定Confirm
阶段和Cancel
阶段的方法。
[TccTransaction(ConfirmMethod = "DeductBalanceConfirm", CancelMethod = "DeductBalanceCancel")]
public async Task<long?> DeductBalance(DeductBalanceInput input)
{
var account = await _accountDomainService.GetAccountById(input.AccountId);
if (input.OrderBalance > account.Balance)
{
throw new BusinessException("账号余额不足");
}
return await _accountDomainService.DeductBalance(input, TccMethodType.Try);
}
public Task DeductBalanceConfirm(DeductBalanceInput input)
{
return _accountDomainService.DeductBalance(input, TccMethodType.Confirm);
}
public Task DeductBalanceCancel(DeductBalanceInput input)
{
return _accountDomainService.DeductBalance(input, TccMethodType.Cancel);
}
第三, 领域层的业务实现
public async Task<long?> DeductBalance(DeductBalanceInput input, TccMethodType tccMethodType)
{
var account = await GetAccountById(input.AccountId);
//涉及多张表,所有每一个阶段的都放到一个本地事务中执行
var trans = await _repository.BeginTransactionAsync();
BalanceRecord balanceRecord = null;
switch (tccMethodType)
{
case TccMethodType.Try:
account.Balance -= input.OrderBalance;
account.LockBalance += input.OrderBalance;
balanceRecord = new BalanceRecord()
{
OrderBalance = input.OrderBalance,
OrderId = input.OrderId,
PayStatus = PayStatus.NoPay
};
await _repository.InsertAsync(balanceRecord);
RpcContext.GetContext().SetAttachment("balanceRecordId",balanceRecord.Id);
break;
case TccMethodType.Confirm:
account.LockBalance -= input.OrderBalance;
var balanceRecordId1 = RpcContext.GetContext().GetAttachment("orderBalanceId")?.To<long>();
if (balanceRecordId1.HasValue)
{
balanceRecord = await _repository.GetByIdAsync<BalanceRecord>(balanceRecordId1.Value);
balanceRecord.PayStatus = PayStatus.Payed;
await _repository.UpdateAsync(balanceRecord);
}
break;
case TccMethodType.Cancel:
account.Balance += input.OrderBalance;
account.LockBalance -= input.OrderBalance;
var balanceRecordId2 = RpcContext.GetContext().GetAttachment("orderBalanceId")?.To<long>();
if (balanceRecordId2.HasValue)
{
balanceRecord = await _repository.GetByIdAsync<BalanceRecord>(balanceRecordId2.Value);
balanceRecord.PayStatus = PayStatus.Cancel;
await _repository.UpdateAsync(balanceRecord);
}
break;
}
await _repository.UpdateAsync(account);
await trans.CommitAsync();
// 将受影响的缓存数据移除。
await _accountCache.RemoveAsync($"Account:Name:{account.Name}");
return balanceRecord?.Id;
}
订单接口测试
前提
存在如下账号和产品:
模拟库存不足
请求参数:
{
"accountId": 1,
"productId": 1,
"quantity": 11
}
响应:
{
"data": null,
"status": 1000,
"statusCode": "BusinessError",
"errorMessage": "订单数量超过库存数量,无法完成订单",
"validErrors": null
}
数据库变化
查看数据库,并没有生成订单信息,账户余额和产品库存也没有修改:
测试结果:
库存和账户余额均为变化,也未创建订单信息
达到期望
模拟账号余额不足
请求参数:
{
"accountId": 1,
"productId": 1,
"quantity": 9
}
响应:
{
"data": null,
"status": 1000,
"statusCode": "BusinessError",
"errorMessage": "账号余额不足",
"validErrors": null
}
数据库变化
新增了一个产品订单,订单状态为未支付状态
产品库存和账户余额并未变更
测试结果:
创建了一个新的订单,状态为未支付,用户账号余额,产品订单均未变化。
达到测试期望
正常下订单
{
"accountId": 1,
"productId": 1,
"quantity": 2
}
响应:
{
"data": {
"id": 2,
"accountId": 1,
"productId": 1,
"quantity": 2,
"amount": 20,
"status": 1
},
"status": 200,
"statusCode": "Success",
"errorMessage": null,
"validErrors": null
}
数据库变化
- 创建了一个订单,该订单状态为已支付
- 库存扣减成功
- 账户金额扣减成功,并且创建了一个流水记录
测试结果:
创建了一个新的订单,状态为支付,用户账号余额,产品订单均被扣减,且也创建了交易流水记录。
达到期望结果。
开源地址
github: https://github.com/liuhll/lms