本文介绍了Akka.Net Streams-缓冲区引发错误时,源停止提取元素的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在为Akka使用。 NET,并在尝试结合使用缓冲区和限制方法时注意到此错误:

I've been playing a little with the streams extension package for Akka.Net and noticed this error at attempting to combine buffer and throttle methods:

using (var system = ActorSystem.Create("test-system"))
using (var materializer = system.Materializer(GetSettings(system)))
{
            int index = 0;
            var sink = Sink.ActorRefWithAck<KeyValue>(
                system.ActorOf<Writer>(), 
                new OnInitMessage(), 
                new OnAcknowledgeMessage(), 
                OnComplete.Instance, 
                exception => new OnError(exception));

            ServiceBusSource
                .Create(client, message =>
                {
                    var json = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
                    var result = JsonConvert.DeserializeObject<KeyValue>(json);

                    message.Complete();

                    return result;
                })
                .WithLogger(system, entity => $"{entity.Key} => {entity.Value}")
                .Buffer(1, OverflowStrategy.Fail)
                .Throttle(1, TimeSpan.FromSeconds(5), 3, ThrottleMode.Shaping)
                .ToMaterialized(sink, Keep.Right)
                .Run(materializer);

            Console.ReadLine();
}

我正在使用
这些是我要引用的软件包:

I'm using ServiceBusSource from AlpakkaThese are the packages I'm referencing:


  • Akka.Streams:1.3.1

  • Akka.Streams.Azure.ServiceBus:0.1.0

  • WindowsAzure.ServiceBus:4.1。 3

故意使它失败,以便查看但是,在缓冲区策略失败后,流完成并且不再提取任何元素。

I'm intentionally making it fail in order to see how behaves BUT, after failing from buffer's strategy, the stream completes and no more elements are being pulled.

KeyValue.cs

public class KeyValue
{
    public int Id { get; set; }

    public string Key { get; set; }

    public string Value { get; set; }

    public DateTime Produced { get; set; }

    public DateTime Emitted { get; set; }

    public override string ToString()
    {
        return $"[{Produced}] - [{Emitted}] => {Id} {Key}:{Value}";
    }
}

GetSettings 方法:

ActorMaterializerSettings GetSettings(ActorSystem system)
        {
            return ActorMaterializerSettings.Create(system)
                .WithSupervisionStrategy(cause =>
                {
                    system.Log.Error(cause, "Failed");
                    return Directive.Resume;
                });
        }


推荐答案

有几种处理方式流中的错误-大多数错误在中描述:

There are several ways of handling errors inside of a stream - most of them described in docs:


  1. 使用恢复进行错误的后备事件。

  2. 使用 RecoverWithRetries 允许在发生错误时重定向到其他流。

  3. 使用 Restart.WithBackoff 在指数补偿延迟后重建重试流。

  4. 使用 WithSupervisionStrategy -这是一个非常有限的选择,因为它仅适用于明确引用它的阶段(如文档中所述)。

  1. Use Recover to make a fallback event from error.
  2. Use RecoverWithRetries to allow to redirect to a different stream upon error.
  3. Use Restart.WithBackoff to rebuild a retry stream after exponential backoff delay.
  4. Use WithSupervisionStrategy - which is a very limited option, as it works only on stages that refer to it explicitly (as explained in docs).

您的情况是设计使然-当您使用 OverflowStrategy.Fail 表示一旦达到溢出,就会产生一个错误。大部分akka阶段的反应都是在出现故障后立即关闭流。

Your case is by design - when you use OverflowStrategy.Fail it means, that once overflow is reached, an error will be produced. Reaction of most of the akka stages is to close stream immediately upon failure.

这篇关于Akka.Net Streams-缓冲区引发错误时,源停止提取元素的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 22:37