我正在使用EasyNetQ,需要重试原始队列中的失败消息。问题是:即使我成功地增加了TriedCount变量(在每个msg的正文中),当EasyNetQ在异常发生后将消息发布到默认错误队列时,更新的TriedCount也不在msg中!大概是因为它只是将原始消息转储到错误队列,而没有使用方的更改。

更新的TriedCount适用于进程内重新发布,但不适用于通过EasyNetQ Hosepipe或EasyNetQ Management Client重新发布的情况。 Hosepipe生成的文本文件没有更新TriedCount。

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
                                                      // if Retry is valid
}

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

public class TryInfo
{
    public int TriedCount { get; set; }

    /*Other information regarding msg attempt*/
}

public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
    IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
    // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
    // The mediator can transmit the retried msg or choose to ignore it
    return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}

我还尝试通过Management API(粗糙代码)重新发布自己:
var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue",
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue,
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue,
crit).Result;

foreach (var errMsg in errMsgs)
{
    var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
                                 new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
        }

这有效,但仅再次发布到错误队列,而不发布在原始队列上。另外,在此阶段,我不知道如何在邮件正文中添加/更新重试信息。

我已经探索了this库来向邮件添加 header ,但是我没有看到正文中的计数是否未更新, header 中的计数如何/为什么被更新。

有什么方法可以在不依靠Advanced总线的情况下保持TriedCount(在这种情况下,我可能会使用RabbitMQ .Net客户端本身)?

最佳答案

为了以防万一,可以最终实现我自己的IErrorMessageSerializer(而不是实现整个IConsumerErrorStrategy,这似乎是一个过大的选择)。我在主体(而不是 header )中添加重试信息的原因是EasyNetQ不能处理 header 中的复杂类型(无论如何都不是开箱即用的)。因此,使用字典可以为不同的使用者提供更多控制。我在创建总线时注册了自定义序列化器,如下所示:

_defaultBus = RabbitHutch.CreateBus(currentConnString, serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId)));

并像这样实现了Serialize方法:
 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

实际的重试由一个简单的控制台应用程序/Windows服务通过EasyNetQ Management API定期进行:
            var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
            var vhost = client.GetVhostAsync("/").Result;
            var aliveRes = client.IsAliveAsync(vhost).Result;
            var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
            var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
            var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
            foreach (var errMsg in errMsgs)
            {
                var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
                var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
                pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
                pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
                pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
                var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result,
                     pubInfo).Result;
            }

我的使用者本身就知道是否启用了重试功能,从而赋予了它更多的控制权,因此它可以选择处理重试的msg或忽略它。一旦忽略,则味精显然将不会再尝试;这就是EasyNetQ的工作方式。

关于c# - EasyNetQ-如何重试失败的消息并在邮件正文/标题中保留RetryCount?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/54680549/

10-13 02:24