抱歉,如果标题不是很清楚,我想不出更好的…
我接收到一个IObservable<char>形式的用户输入,我想将其转换为IObservable<char[]>,方法是每次用户停止输入超过1秒时对字符进行分组。例如,如果输入如下:

h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)

我希望可以观察到的输出是:
['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']

我怀疑解决方法相当简单,但我找不到正确的方法…我试图使用BufferGroupByUntilThrottle和其他一些方法,但都没有成功。
任何想法都欢迎!
编辑:我有些东西几乎可以用了:
        _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

但每次键入新字符时,都需要重置延迟…

最佳答案

BufferThrottle就足够了,如果你的源是热的。要使其变热,您可以使用.Publish().RefCount()来确保最终只订阅一个源代码。

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
                                              TimeSpan dueTime)
{
    if (source == null) throw new ArgumentNullException("source");
    //defer dueTime checking to Throttle
    var hot = source.Publish().RefCount();
    return hot.Buffer(() => hot.Throttle(dueTime));
}

09-07 00:20