有花时间去研究masstransit的saga,英文水平不过关,始终无法实现上手他的代码编排的业务,遗憾。
本文通过rabbit和sqlserver实现下单,更新库存,更新产品,模拟数据最终一致性。
项目结构如下,reportService可有可无,这里就相当一个链条,只要两节走通了后面可以接龙,本文有用到不省略。流程:orderservice=>eComm=>reportservice 。
下面先看看order的配置,通过控制器新增订单同时发布订单信息到order_exchange交换机,Key是"order.created,这样就把订单推送到了队列,等到库存服务获取订单去更新库存。
// POST api/<OrderController> [HttpPost] public async Task Post([FromBody] OrderDetail orderDetail) { var id = await orderCreator.Create(orderDetail); publisher.Publish(JsonConvert.SerializeObject(new OrderRequest { OrderId = id, ProductId = orderDetail.ProductId, Quantity = orderDetail.Quantity }), "order.created", null); }
更新库存的代码,然后再发送消息告诉order服务,这里有哪个try包裹,如果这里有失败会触发catch,发送减库存失败的消息。order服务消费到这条消息就会执行相应的删除订单操作。代码如下:
using Ecomm.DataAccess; using Ecomm.Models; using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using Plain.RabbitMQ; using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Ecomm { public class OrderCreatedListener : IHostedService { private readonly IPublisher publisher; private readonly ISubscriber subscriber; private readonly IInventoryUpdator inventoryUpdator; public OrderCreatedListener(IPublisher publisher, ISubscriber subscriber, IInventoryUpdator inventoryUpdator) { this.publisher = publisher; this.subscriber = subscriber; this.inventoryUpdator = inventoryUpdator; } public Task StartAsync(CancellationToken cancellationToken) { subscriber.Subscribe(Subscribe); return Task.CompletedTask; } private bool Subscribe(string message, IDictionary<string, object> header) { var response = JsonConvert.DeserializeObject<OrderRequest>(message); try { inventoryUpdator.Update(response.ProductId, response.Quantity).GetAwaiter().GetResult(); publisher.Publish(JsonConvert.SerializeObject( new InventoryResponse { OrderId = response.OrderId, IsSuccess = true } ), "inventory.response", null); } catch (Exception) { publisher.Publish(JsonConvert.SerializeObject( new InventoryResponse { OrderId = response.OrderId, IsSuccess = false } ), "inventory.response", null); } return true; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }
using Ecomm.Models; using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using Plain.RabbitMQ; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace OrderService { public class InventoryResponseListener : IHostedService { private readonly ISubscriber subscriber; private readonly IOrderDeletor orderDeletor; public InventoryResponseListener(ISubscriber subscriber, IOrderDeletor orderDeletor) { this.subscriber = subscriber; this.orderDeletor = orderDeletor; } public Task StartAsync(CancellationToken cancellationToken) { subscriber.Subscribe(Subscribe); return Task.CompletedTask; } private bool Subscribe(string message, IDictionary<string, object> header) { var response = JsonConvert.DeserializeObject<InventoryResponse>(message); if (!response.IsSuccess) { orderDeletor.Delete(response.OrderId).GetAwaiter().GetResult(); } return true; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }
上面的代码是整个服务的核心业务,也很简单就是队列相互通信相互确认操作是否顺利,失败就执行回归操作,而这里我们都会写好对应补偿代码:
using Dapper; using System; using System.Collections.Generic; using System.Data.SqlClient; using System.Linq; using System.Threading.Tasks; namespace OrderService { public class OrderDeletor : IOrderDeletor { private readonly string connectionString; public OrderDeletor(string connectionString) { this.connectionString = connectionString; } public async Task Delete(int orderId) { using var connection = new SqlConnection(connectionString); connection.Open(); using var transaction = connection.BeginTransaction(); try { await connection.ExecuteAsync("DELETE FROM OrderDetail WHERE OrderId = @orderId", new { orderId }, transaction: transaction); await connection.ExecuteAsync("DELETE FROM [Order] WHERE Id = @orderId", new { orderId }, transaction: transaction); transaction.Commit(); } catch { transaction.Rollback(); } } } }
库存服务里有发布产品的接口,这里没有做过多的处理,只是把产品新增放到队列,供后面的ReportService服务获取,该服务拿到后会执行产品数量扣除:
using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using Plain.RabbitMQ; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ReportService { public class ReportDataCollector : IHostedService { private const int DEFAULT_QUANTITY = 100; private readonly ISubscriber subscriber; private readonly IMemoryReportStorage memoryReportStorage; public ReportDataCollector(ISubscriber subscriber, IMemoryReportStorage memoryReportStorage) { this.subscriber = subscriber; this.memoryReportStorage = memoryReportStorage; } public Task StartAsync(CancellationToken cancellationToken) { subscriber.Subscribe(Subscribe); return Task.CompletedTask; } private bool Subscribe(string message, IDictionary<string, object> header) //private bool ProcessMessage(string message, IDictionary<string, object> header) { if (message.Contains("Product")) { var product = JsonConvert.DeserializeObject<Product>(message); if (memoryReportStorage.Get().Any(r => r.ProductName == product.ProductName)) { return true; } else { memoryReportStorage.Add(new Report { ProductName = product.ProductName, Count = DEFAULT_QUANTITY }); } } else { var order = JsonConvert.DeserializeObject<Order>(message); if(memoryReportStorage.Get().Any(r => r.ProductName == order.Name)) { memoryReportStorage.Get().First(r => r.ProductName == order.Name).Count -= order.Quantity; } else { memoryReportStorage.Add(new Report { ProductName = order.Name, Count = DEFAULT_QUANTITY - order.Quantity }); } } return true; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }
到这里整个流程大概如此。只要理清楚了订单和库存更新这里的业务,后面差不多一样,可以无限递归。代码文末有链接供下载。
这里有一个地方的代码如下,新增库存的时候同时发布消息。假如新增完订单后面崩掉了,这里是个原子操作最佳。
[HttpPost] public async Task Post([FromBody] OrderDetail orderDetail) { var id = await orderCreator.Create(orderDetail); publisher.Publish(JsonConvert.SerializeObject(new OrderRequest { OrderId = id, ProductId = orderDetail.ProductId, Quantity = orderDetail.Quantity }), "order.created", null); }
很遗憾masstransit的saga还没有整明白,那就上cap,完成业务一致性。加了点cap代码因为之前是dapper,所以加了dbcontext和cap相关代码有点小乱。核心代码如下:
using DotNetCore.CAP; using MediatR; using OrderService.Command; using System.Threading; using Ecomm.Models; using System.Collections.Generic; namespace OrderService.Handler { public class InsertOrderDetailHandler : IRequestHandler<InsertOrderDetailCommand, InsertOrderDetailModel> { private readonly OrderDbContext context; private readonly ICapPublisher cap; public InsertOrderDetailHandler(OrderDbContext context, ICapPublisher cap) { this.context = context; this.cap = cap; } public async System.Threading.Tasks.Task<InsertOrderDetailModel> Handle(InsertOrderDetailCommand request, CancellationToken cancellationToken) { using(var trans =context.Database.BeginTransaction(cap)) { var order = context.Orders.Add(new Order { UpdatedTime = System.DateTime.Today, UserId = request.UserId, UserName = request.UserName }); var orderDetail = context.OrderDetails.Add(new OrderDetail { OrderId = order.Entity.Id, ProductId = request.ProductId, Quantity = request.Quantity, ProductName = request.ProductName, }); context.SaveChanges(); cap.Publish<OrderRequest>("order.created", new OrderRequest { OrderId = order.Entity.Id, ProductId = orderDetail.Entity.ProductId, Quantity = orderDetail.Entity.Quantity }, new Dictionary<string,string>()) ; trans.Commit(); return new InsertOrderDetailModel { OrderDetailid = orderDetail.Entity.Id, OrderId = order.Entity.Id, Success = true }; } } } }
到这里差不多要结束了,这里的代码都可以调试运行的。因为加了cap,order服务有两套rabbitmq的配置,有冗余,而且有点坑。调试的时候注意,Plain.RabbitMQ支持的交换机不是持久化的,而cap是持久化的,所以有点不兼容。第一次运行可以先确保Plain.RabbitMQ正常,再删掉交换机,cap跑起来了再建持久化交换机,这样cap消息就会被rabbitmq接收,后面就会被库存服务消费。因为我这里cap不会自动绑定队列,Plain.RabbitMQ是可以的。所以需要新建交换机后再绑定队列。而且这里队列以Plain.RabbitMQ生成的名字来绑定。要不然又可能会调试踩坑无法出坑。 用cap不注意你连消息队列都看不到,看到了队列也看不到消费数据,这点不知道是我不会还是cap有什么难的配置。结束。。。
上例项目demo:
liuzhixin405/SimpleOrders_Next (github.com)
超简单微服务demo