我正在从事件构建一个可观察对象,该事件输出文本行以确认记录,该记录由两条空行分开。例如:
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
我希望观察者将输出分成两行,以便订阅者以块的形式获取数据。
如何使用RxJS完成此操作?它似乎没有执行这项工作的功能。
我可以订阅观察者,积累价值并重新发散自己,但我相信有一个我看不到的更优雅的解决方案。
最佳答案
buffer
和sample
以及scan
的组合也可以使用。基本上,您将输入(即行)累积在缓冲区中。每次可观察对象发出值时,使用sample
释放此缓冲区。然后,将其设置为使observable每次检测到两个连续的\n
时都会发出一个值。这可以通过scan
实现。请注意,这要求您的source$
必须是可观察的热点。
因此,您可以完成以下代码,并最终使我们保持更新:
var detect_two_lines = function (acc, new_line)){
// if new_line and last line of acc are both \n
// then acc.arr_lines = [], acc.found = true
// else acc.arr_lines.push(new_line), acc.found = false
}
var identity = function(x){return x}
var sample$ = source$.scan(detect_two_lines, {arr_lines:[], found:false})
.pluck('found')
.filter(identity);
var results$ = source$.buffer(sample$);