我正在从事件构建一个可观察对象,该事件输出文本行以确认记录,该记录由两条空行分开。例如:

xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx


xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx


xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxx


我希望观察者将输出分成两行,以便订阅者以块的形式获取数据。

如何使用RxJS完成此操作?它似乎没有执行这项工作的功能。

我可以订阅观察者,积累价值并重新发散自己,但我相信有一个我看不到的更优雅的解决方案。

最佳答案

buffersample以及scan的组合也可以使用。基本上,您将输入(即行)累积在缓冲区中。每次可观察对象发出值时,使用sample释放此缓冲区。然后,将其设置为使observable每次检测到两个连续的\n时都会发出一个值。这可以通过scan实现。请注意,这要求您的source$必须是可观察的热点。

因此,您可以完成以下代码,并最终使我们保持更新:

var detect_two_lines = function (acc, new_line)){
  // if new_line and last line of acc are both \n
  // then acc.arr_lines = [], acc.found = true
  // else acc.arr_lines.push(new_line), acc.found = false
}
var identity = function(x){return x}
var sample$ = source$.scan(detect_two_lines, {arr_lines:[], found:false})
                     .pluck('found')
                     .filter(identity);
var results$ = source$.buffer(sample$);

09-26 13:41