前言
asp.net core版本选择2.2,只是因为个人习惯了vs2017,代码以及设计皆可移植到vs2019,用asp.net core 3.0以及以上运行起来
项目类似选择web api,基础设施选择entity frame core + Masstransit + aspectCore
先赘述一下思路业务,中间通讯以及容错/重试交给masstransit,部分流程的解耦交给aspectCore来完成,这部分包括,错误后通过masstransit发布给错误处理的模块,最后落盘到ef core
整个过程,业务处理的层之间可以通过Masstransit的通讯,可采用rpc的模式,也可以同异步的发布订阅通讯
整体设计是可单体可分布式的理念,可以根据项目变化,可以自行配置,拆分成完成单体到分布式的过程,完成业务分解,但是对于业务使用而言,都是一样的,无感知。
刚开始的文章会有很多代码段说明,属于前置知识的铺垫,可能会有些啰嗦,后续文章会忽略掉无关的大片代码段。
业务演示,我们选择传统的银行转账业务以及电商的支付到下单。
整体crud的设计图
包括了业务层和基础设施层的设计
当前文章的设计图如下
整体采用Mq异步的发布订阅,订阅之间也通过发布订阅通知,完成最终一致性
业务实体
public class BaseModel
{
[Key]
public int Id { get; set; }
public DateTime CreateTime { get; set; }
}
public class UserInfo: BaseModel
{ public string NickName { get; set; }
public decimal Money { get; set; }
public DateTime LastOptions { get; set; }
}
public class PayOrder: BaseModel
{
public int SourceId { get; set; }
public int TargetId { get; set; }
public decimal Money { get; set; }
}
配置EF Core
public class TransactionContexts:DbContext
{
public DbSet<PayOrder> PayOrders { get; set; }
public DbSet<UserInfo> UserInfos { get; set; } public TransactionContexts(DbContextOptions<TransactionContexts> options):base(options)
{ }
}
配置数据源偷懒就用InMemory了
services.AddDbContext<TransactionContexts>(build=> {
build.UseInMemoryDatabase("TransactionContexts");
});
实例编写
建立一个交易的Command
internal class PayOrderCommand
{
public int SourceId { get; set; }
public int TargetId { get; set; }
public decimal Money { get; set; }
}
一个交易的Event
internal class PayOrderEvent: PayOrderCommand
{ }
Command对外,Event对内,主要是用于项目区分外部流程和内部流程的区别,代码本身是没有硬编码要求的
构建一个交易的服务,对外公开一个转账的接口
public interface ITransactionService
{
void TransferAccounts(int sourceId, int targetId, decimal money);
}
这个接口完成Publish/Subscribe模式的交易
Publish端
internal class TransactionService: ITransactionService
{
private IBusControl busControl; public TransactionService(IBusControl busControl)
{
this.busControl = busControl;
} public async void TransferAccounts(int sourceId, int targetId, decimal money)
{
await busControl.Publish(
new PayOrderCommand { SourceId = sourceId, TargetId = targetId, Money = money }
);
}
}
很常见的发布一个命令
Subscribe端
internal class TransactionConsumer :
IConsumer<PayOrderCommand>,
IConsumer<PayOrderEvent>
{
private TransactionContexts transactionContexts; public TransactionConsumer(TransactionContexts transactionContexts)
{
this.transactionContexts = transactionContexts;
} public async Task Consume(ConsumeContext<PayOrderCommand> context)
{
var value = context.Message; await Console.Out.WriteLineAsync($"PayOrderCommand Before:{DateTime.Now} SourceId:{value.SourceId} TargetId:{value.TargetId} Money:{value.Money}"); await transactionContexts.PayOrders.AddAsync(new PayOrder
{
Id = ,
SourceId = value.SourceId,
TargetId = value.TargetId,
Money = value.Money
});
await transactionContexts.SaveChangesAsync(); await context.Publish(new PayOrderEvent
{
SourceId = value.SourceId,
TargetId = value.TargetId,
Money = value.Money
}); await Console.Out.WriteLineAsync($"PayOrderCommand After:{DateTime.Now}");
} public async Task Consume(ConsumeContext<PayOrderEvent> context)
{
var value = context.Message; await Console.Out.WriteLineAsync($"PayOrderEvent Before:{DateTime.Now} SourceId:{value.SourceId} TargetId:{value.TargetId} Money:{value.Money}"); var source = transactionContexts.UserInfos.First(user => user.Id == value.SourceId); if (source.Money < value.Money)
throw new Exception(); var target = transactionContexts.UserInfos.First(user => user.Id == value.TargetId); source.Money -= value.Money;
target.Money += value.Money; transactionContexts.UserInfos.Update(source);
transactionContexts.UserInfos.Update(target); await transactionContexts.SaveChangesAsync(); await Console.Out.WriteLineAsync($"PayOrderEvent After:{DateTime.Now}");
}
}
配置依赖注入流程
MassTransit的通讯选择MassTransit.RabbitMQ,这个库,也支持很多MQ,个人图方便就选的rabbitmq,后期要更换,修改一下依赖注入的配置即可
services.AddScoped<TransactionConsumer>(); services.AddMassTransit(c =>
{
c.AddConsumer<TransactionConsumer>();
c.AddBus(serviceProvider =>
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst =>
{
hst.Username("guest");
hst.Password("guest");
}); cfg.ReceiveEndpoint("Transaction", config =>
{
config.ConfigureConsumer<TransactionConsumer>(serviceProvider);
});
});
});
});
这样就完成了引入Masstransit做数据通讯部分,Masstransit支持Rpc模式,也支持Publish/Subscribe,后续的文章会混搭Rpc模式和发布订阅的模式,主要根据业务场景的选择做调整
编写演示例子
在Configure里面配置初始化数据
using (var serviceScoped = app.ApplicationServices.CreateScope())
{
var serviceProvider = serviceScoped.ServiceProvider;
var context = serviceProvider.GetRequiredService<TransactionContexts>(); context.UserInfos.Add(new UserInfo
{
Id = ,
NickName = "Form",
CreateTime = DateTime.Now,
LastOptions = DateTime.Now,
Money =
});
context.UserInfos.Add(new UserInfo
{
Id = ,
NickName = "To",
CreateTime = DateTime.Now,
LastOptions = DateTime.Now,
Money =
}); context.SaveChanges();
}
#endregion
给Configure方法增加一个注入的接口
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime)
{
//。。。
var busControl = app.ApplicationServices.GetRequiredService<IBusControl>(); lifetime.ApplicationStarted.Register(busControl.Start);
lifetime.ApplicationStopped.Register(busControl.Stop);
}
依旧是省事儿的Configure方法里面写的一个管道
app.Run(async (context) =>
{
var serviceProvider = context.RequestServices; var bus = serviceProvider.GetRequiredService<IBusControl>(); await bus.Publish(new PayOrderCommand
{
SourceId = 1,
TargetId = 2,
Money = 2000
}); await context.Response.WriteAsync("Hello World!");
});
后话
写在默认的index管道,会触发一个有意思的BUG,会两次执行,但是主键是唯一的,这样重复执行会抛出异常,提示主键已存在
这个例子是先铺垫一下,很多有意思的实现还没开展
1、某个业务需要执行过程和下游强一致性,要么一起完成,要么当场失败
2、某个业务完成最终一致性,这个过程,如果失败需要重试/限流/熔断
3、某个业务完成最终一致性,失败了则触发补偿
先铺垫一下,后空余时间逐一编写示例演示
现在的代码一点都不优雅,还有很强的面向于具体实现的,后续会逐步高度抽象化,从相对麻烦的代码,变成纯crud拿来主义,业务层 和后续代码的流程无感
打个小广告
如果有技术交流可以加NCC的群 24791014、436035237,我在群里,有任何关于asp.net core/Masstransit的问题或者建议都可以与我交流,非常欢迎
示例代码: