我想要做的声明如下所示:// Checks input source for timeouts, based on the number of elements received// from clock since the last one received from source.// The two selectors are used to generate output elements.public static IObservable<R> TimeoutDetector<T1,T2,R>( this IObservable<T1> source, IObservable<T2> clock, int countForTimeout, Func<R> timedOutSelector, Func<T1, R> okSelector)大理石图形在ascii中很困难,但这里有:source --o---o--o-o----o-------------------o---clock ----x---x---x---x---x---x---x---x---x---output --^---^--^-^----^-----------!-------^---我尝试过寻找可以将Observable和source组合在一起的现有clock函数,但是大多数组合函数都依赖于接收“每个”中的一个(And,),或者它们从“缺失”(Zip)中返回“先前”值,或者它们与我需要的值(CombineLatest,Amb,GroupJoin,,Join,Merge)。 SelectMany看起来很近,但是我不想将源吞吐量限制为时钟速率。所以现在我被困在尝试填补这里的巨大空白:return new AnonymousObservable<R>(observer =>{ //One observer, two observables??});抱歉,“您尝试了什么”部分在这里有点薄弱:假设我已经尝试过了!我不要求完整的实现,只是:是否有内置功能可以帮助我错过的功能?我该如何构建一个基于lambda的观察器,该观察器订阅两个可观察对象? (adsbygoogle = window.adsbygoogle || []).push({}); 最佳答案 我知道您并没有要求完整的实现,但是我认为这是一个解决方案:public static IObservable<TR> TimeoutDetector<T1, T2, TR>( this IObservable<T1> source, IObservable<T2> clock, int countForTimeout, Func<TR> timedOutSelector, Func<T1, TR> okSelector){ return source .Select(i => clock.Take(countForTimeout).LastAsync()) .Switch().Select(_ => timedOutSelector()) .Merge(source.Select(okSelector));}它的工作方式如下-我注意到您的输出是由okSelector投影的源,并与超时事件合并。所以我专注于产生超时事件,因为其余的很容易。这个想法是在每次发射源时创建一个倒数,并在每个时钟脉冲上递减此倒数。如果源发出信号,我们将终止倒计时,否则当倒计时达到0时,我们将产生一个timedOut事件。分解:将每个源元素投影到采用第一个countForTimeout元素的流中-请注意,时钟流必须是“热”可观察的,因为我们在每个countDown事件上都对其进行订阅。时钟流变热是很正常的。如果发生任何事件,我们将超时。Switch将丢弃除最新倒计时流之外的所有内容。使用Select投影到timedOut事件。现在,只需合并源事件。这是我使用的单元测试,其设计非常类似于您的大理石图(nuget rx-testing和nunit,用于编译所需的库): [Test] public void AKindOfTimeoutTest() { var scheduler = new TestScheduler(); var clockStream = scheduler.CreateHotObservable( OnNext(100, Unit.Default), OnNext(200, Unit.Default), OnNext(300, Unit.Default), OnNext(400, Unit.Default), OnNext(500, Unit.Default), OnNext(600, Unit.Default), OnNext(750, Unit.Default), /* make clock funky! */ OnNext(800, Unit.Default), OnNext(900, Unit.Default)); var sourceStream = scheduler.CreateColdObservable( OnNext(50, 1), OnNext(150, 2), OnNext(250, 3), OnNext(275, 4), OnNext(400, 5), OnNext(900, 6)); Func<int> timedOutSelector = () => 0; Func<int, int> okSelector = i => i; var results = scheduler.CreateObserver<int>(); sourceStream.TimeoutDetector(clockStream, 3, timedOutSelector, okSelector) .Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(50, 1), OnNext(150, 2), OnNext(250, 3), OnNext(275, 4), OnNext(400, 5), OnNext(750, 0), OnNext(900, 6)); }}尝试回答您的特定问题:问:是否有内置功能可以帮助我错过的功能?答:可能扫描是关键。问:我该如何构建一个基于lambda的观察器,该观察器订阅两个可观察对象?答:不太清楚这是什么意思...有很多组合流的方法,您提到了大多数。关于c# - 使用一个可观察的时钟作为时钟来测试另一个是否超时,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/19829974/ (adsbygoogle = window.adsbygoogle || []).push({});
10-10 09:09