前言

根据业务处理部分,单体马上就能得知错误与否,快速做出处理,而分布式系统,会因为各种原因,无法如同单体一样立刻处理,所以这个时候需要 处理异常 的,做 补偿转移人工干预

当然也可以直接在消费端做重试/限流和熔断,但是个人理解,不建议,处理失败的转移到低优先顺序的队列,由专门处理失败消费的部分来处理问题,在实际操作中,可以是单独的服务器,不影响业务流程线,处理失败则发送消息,人为干预。


消费/执行错误的处理流程

基于asp.net core 从零搭建自己的业务框架(三)-LMLPHP

无论的消费订阅还是Rpc远程调用,一旦处理失败,都应当有 转移错误补偿以及人工干预 的异常处理

Masstransit默认是有重试的,所以可以根据获取到的重试次数和错误信息,选择是 转移 还是直接 人工干预补偿则一般应用于支付相关服务,失败则立刻退款/人工处理,这种失败了,重试还是失败


实例编写

基于发布订阅的失败处理

        private async Task Execute(ConsumeContext context, ExceptionInfo exceptionInfo)
{
await Console.Out.WriteLineAsync($"Type:{exceptionInfo.ExceptionType} Message:{exceptionInfo.Message}"); if (exceptionInfo.InnerException != null)
{
await Execute(context, exceptionInfo.InnerException);
}
} public async Task Consume(ConsumeContext<Fault<PayOrderEvent>> context)
{
var retryCount = context.GetRetryCount(); if (retryCount == )
{
if (EndpointConvention.TryGetDestinationAddress<PayOrderEvent>(out Uri endPointUrl))
{
await context.Forward(endPointUrl);
}
}
else
{
var exceptions = context.Message.Exceptions; foreach (var exceptionInfo in exceptions)
{
await Execute(context, exceptionInfo);
}
}
}

这是一段很简单的业务逻辑,PayOrderEvent 的消息消费失败后,则会进入这个专门处理 PayOrderEvent 消费失败的方法

当重试处理第三次,获取错误信息,尝试处理错误信息,当重试第五次则转移队列

基于Rpc的消息异常处理

以上这段代码,只适用于发布订阅,Masstransit的Rpc模式则 无法捕获异常,只能在代码段上做try catch处理,或者引入AspectCore做捕获到异常

基于Rpc模式的异常捕获

基于代码而言try catch足以,但是出于项目管理的角度,应该采用AOP(Aspect-Oriented Programming)做代码的职责分离,预留出足够多的通用层,这里AOP部分采用Lemon的作品 AspectCore

AOP特性部分实现如下

    public class RpcConsumerAttribute : AbstractInterceptorAttribute
{
public override async Task Invoke(AspectContext context, AspectDelegate next)
{
try
{
await next(context);
}
catch
{
var _arg = context.Parameters[]; if (_arg is ConsumeContext consume)
{
var consumeType = consume.GetType().GetGenericArguments()[];
var property = consume.GetType().GetProperty("Message").GetReflector();
var arg = property.GetValue(consume);
var argType = arg.GetType(); var faultType = typeof(FaultMessage<>).MakeGenericType(argType);
var constructor = faultType.GetConstructor(new[] { argType });
var constructorInfo = constructor.GetReflector();
var instance = constructorInfo.Invoke(arg); var busControl = context.ServiceProvider.GetRequiredService<IBusControl>();
await busControl.Publish(instance);
}
}
}
}

简单的try catch,在异常处理部分使用反射获取出未正常执行的实体,然后发布 FaultMessage<Message> ,其中 Message 是未正常处理的实体类型

其中反射里携带的 GetReflector 取自于Lemon的 AspectCore.Extensions.Reflection ,原理是构建Emit代码,构建委托,然后换成下来,从单纯的反射,变成调用委托执行反射代码,避免的传统的反射调用的嵌套反射执行

    public class FaultMessage<TEntity>
where TEntity:class
{
public TEntity Entity { get; } public FaultMessage(TEntity entity)
{
Entity = entity;
}
}

处理业务部分代码如下

        [RpcConsumer]
public virtual async Task Consume(ConsumeContext<PayOrderEvent> context)
{
//... 业务代码段不再赘述
} public Task Consume(ConsumeContext<FaultMessage<PayOrderEvent>> context)
{
var message = context.Message;
var sourceMessage = message.Entity; return Task.CompletedTask;
}

调用部分遵循Rpc调用即可

                var entity = new PayOrderEvent
{
SourceId = ,
TargetId = ,
Money =
};
var response = await busControl.Request<PayOrderEvent, PayOrderResponse>(entity);

消费 PayOrderEvent 的 部分,只要正常的Ef Core原封不动的写数据即可,执行两次,造成 主键冲突,则被 RpcConsumerAttribute 捕获异常,触发发布到 FaultMessage<PayOrderEvent> 的消费端即可


后话

引入 AspectCore 的这部分,踩了一些雷,感谢社区的 茶姨以及柠檬的帮助 深感个人实力非常薄弱,很多东西未能深入理解,学到用时方恨少

打个小广告

如果有技术交流可以加NCC的群 24791014、436035237,我在群里,有任何关于asp.net core/Masstransit的问题或者建议都可以与我交流,非常欢迎

示例代码:

https://github.com/htrlq/Crud.Sample

05-26 15:19