原始问题
我有一个场景,我有多个IObservable
序列,我想与Merge
结合在一起然后听。但是,如果其中之一产生错误,我不希望它使其他流的所有崩溃,以及重新订阅该序列(这是一个“持久”的序列)。
我通过在合并之前将Retry()
附加到流中来做到这一点,即:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
但是,当我要对此进行测试时,就会出现问题。我要测试的是,如果
IObservable
中的observables
之一生成了OnError
,则其他Subject<int>
仍应能够通过其发送值,并且应该对其进行处理我以为我只用
IObservable
中的两个observables
代表两个OnError(new Exception())
;一个发送OnNext(1)
,另一个发送Subject<int>
。但是,似乎Retry()
将为新订阅重播所有先前的值(实际上是IObservable
),从而使测试陷入无限循环。我试图通过创建手动
Subject
来解决此问题,该代码在第一次订阅时产生错误,随后在空序列中出现,但是感觉很hacky:var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
我是在使用
Retry()
还是以错误的方式考虑Retry()
?还有其他想法吗?您将如何解决这种情况?更新
好的,这是我想要和认为
Subject
可以使用的大理石图。o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
我的问题可能更多是因为我没有一个很好的股票类来进行前期测试,因为
Subject
希望重播以前的所有错误。更新2
这是一个测试用例,显示了我对
Subject
重播其值的含义。如果我说的是冷漠的话,我是否正确使用了该术语?我知道ojit_code是一种创建可观察的热点的方法,但是这种行为对我来说仍然感觉“冷”。var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);
最佳答案
根据您更新的要求(您想重试失败的可观察对象,而不仅仅是想忽略它们),我们可以提出一个可行的解决方案。
首先,重要的是要了解冷的可观察对象(在每个订阅上都重新创建)和热的可观察对象(与订阅无关,都存在)之间的区别。您不能Retry()
一个可观察的热点,因为它不知道如何重新创建基础事件。也就是说,如果出现可观察到的热门错误,那么它将永远消失。Subject
会创建一个可观察的热点,这意味着您可以在没有订阅者的情况下调用OnNext
,它将按预期运行。要将热的可观测值转换为冷的可观测值,可以使用Observable.Defer
,它将包含该可观测值的“订阅创建”逻辑。
综上所述,这是修改后的原始代码以实现此目的:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { Observable.Defer(() => {success = new Subject<int>(); return success.AsObservable();}),
Observable.Defer(() => {error = new Subject<int>(); return error.AsObservable();}) };
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done"));
和测试(类似于之前):
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
error.OnNext(-1);
success.OnCompleted();
error.OnCompleted();
和预期的输出:
1
2
-1
done
当然,您需要根据可观察的基础对该概念进行重大修改。使用主题进行测试与使用主题进行真实测试是不同的。
我还想指出这一评论:
是不正确的-
Subject
不会以这种方式运行。基于Retry
重新创建订阅,并且订阅在某个时候创建错误这一事实,您的代码还有其他方面导致无限循环。原始答案(用于完成)
问题是
Retry()
不会执行您想要的操作。从这里:http://msdn.microsoft.com/en-us/library/ff708141(v=vs.92).aspx
这意味着
Retry
将不断尝试并重新连接到基础可观察对象,直到成功并且不会引发错误。我的理解是,您实际上希望将可观察对象中的异常忽略而不是重试。这将执行您想要的操作:
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(/* subscription code */);
这将使用
Catch
捕获带有异常的可观察对象,并在该点处将其替换为空的可观察对象。这是使用主题的完整测试:
var success = new Subject<int>();
var error = new Subject<int>();
var observables = new List<IObservable<int>> { success.AsObservable(), error.AsObservable() };
observables
.Select(o => o.Catch((Func<Exception,IObservable<int>>)(e => Observable.Empty<int>())))
.Merge()
.Subscribe(Observer.Create<int>(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("done")));
success.OnNext(1);
error.OnError(new Exception("test"));
success.OnNext(2);
success.OnCompleted();
这产生了,如预期的那样:
1
2
done
关于c# - 非重播热可观察,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/10944121/