原始问题

我有一个场景,我有多个IObservable序列,我想与Merge结合在一起然后听。但是,如果其中之一产生错误,我不希望它使其他流的所有崩溃,以及重新订阅该序列(这是一个“持久”的序列)。

我通过在合并之前将Retry()附加到流中来做到这一点,即:

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

但是,当我要对此进行测试时,就会出现问题。我要测试的是,如果IObservable中的observables之一生成了OnError,则其他Subject<int>仍应能够通过其发送值,并且应该对其进行处理

我以为我只用IObservable中的两个observables代表两个OnError(new Exception());一个发送OnNext(1),另一个发送Subject<int>。但是,似乎Retry()将为新订阅重播所有先前的值(实际上是IObservable),从而使测试陷入无限循环。

我试图通过创建手动Subject来解决此问题,该代码在第一次订阅时产生错误,随后在空序列中出现,但是感觉很hacky:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

我是在使用Retry()还是以错误的方式考虑Retry()?还有其他想法吗?您将如何解决这种情况?

更新

好的,这是我想要和认为Subject可以使用的大理石图。
o = message, X = error.
------o---o---X
               \
     Retry() -> \---o---o---X
                             \
                   Retry() -> \...

我的问题可能更多是因为我没有一个很好的股票类来进行前期测试,因为Subject希望重播以前的所有错误。

更新2

这是一个测试用例,显示了我对Subject重播其值的含义。如果我说的是冷漠的话,我是否正确使用了该术语?我知道ojit_code是一种创建可观察的热点的方法,但是这种行为对我来说仍然感觉“冷”。
var onNext = false;
var subject = new Subject<int>();

subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);

Assert.That(onNext, Is.True);

最佳答案

根据您更新的要求(您想重试失败的可观察对象,而不仅仅是想忽略它们),我们可以提出一个可行的解决方案。

首先,重要的是要了解冷的可观察对象(在每个订阅上都重新创建)和热的可观察对象(与订阅无关,都存在)之间的区别。您不能Retry()一个可观察的热点,因为它不知道如何重新创建基础事件。也就是说,如果出现可观察到的热门错误,那么它将永远消失。
Subject会创建一个可观察的热点,这意味着您可以在没有订阅者的情况下调用OnNext,它将按预期运行。要将热的可观测值转换为冷的可观测值,可以使用Observable.Defer,它将包含该可观测值的“订阅创建”逻辑。

综上所述,这是修改后的原始代码以实现此目的:

var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}),
                                               Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };

observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));

和测试(类似于之前):
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();

和预期的输出:
1
2
-1
done

当然,您需要根据可观察的基础对该概念进行重大修改。使用主题进行测试与使用主题进行真实测试是不同的。

我还想指出这一评论:



是不正确的-Subject不会以这种方式运行。基于Retry重新创建订阅,并且订阅在某个时候创建​​错误这一事实,您的代码还有其他方面导致无限循环。

原始答案(用于完成)

问题是Retry()不会执行您想要的操作。从这里:

http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx



这意味着Retry将不断尝试并重新连接到基础可观察对象,直到成功并且不会引发错误。

我的理解是,您实际上希望将可观察对象中的异常忽略而不是重试。这将执行您想要的操作:
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);

这将使用Catch捕获带有异常的可观察对象,并在该点处将其替换为空的可观察对象。

这是使用主题的完整测试:
var success = new Subject<int>();
var error = new Subject<int>();

var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };

observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));

success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();

这产生了,如预期的那样:
1
2
done

关于c# - 非重播热可观察,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/10944121/

10-11 17:28