本文介绍了RxJs 将流拆分为多个流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何根据分组方法将一个永无止境的流拆分为多个结束流?

How can I split a never ending stream into multiple ending streams based on a grouping method?

--a--a-a-a-a-b---b-b--b-c-c---c-c-d-d-d-e...>

进入这些可观察对象

--a--a-a-a-a-|
             b---b-b--b-|
                        c-c---c-c-|
                                  d-d-d-|
                                        e...>

如你所见,a是开头的,在我收到b后,我将不再得到a,所以它应该结束.这就是为什么普通的 groupBy 不好.

As you can see, the a is at the beginning, and after I receive b, i will no longer get a so it should be ended. That's why the normal groupBy is not good.

推荐答案

您可以使用 windowshare 源 Observable.bufferCount(2, 1) 还有一个小技巧:

You can use window and share the source Observable. There's also a little trick with bufferCount(2, 1):

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

source
    .bufferCount(2, 1) // delay emission by one item
    .map(arr => arr[0])
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

这会打印(因为 toArray()):

[ 'a', 'a', 'a', 'a', 'a' ]
[ 'b', 'b', 'b', 'b' ]
[ 'c', 'c', 'c', 'c' ]
[ 'd', 'd', 'd' ]
[ 'e' ]

这个解决方案的问题是订阅source 的顺序.我们需要 window 通知程序在第一个 bufferCount 之前订阅.否则,首先将一个项目进一步推入,然后使用 .filter(([oldValue, newValue]) ...) 检查它是否与前一个不同.

The problem with this solution is the order of subscriptions to source. We need the window notifier to subscribe before the first bufferCount. Otherwise an item is first pushed further and then is checked whether it's different than the previous one with .filter(([oldValue, newValue]) ...).

这意味着需要在 window 之前延迟发射(这是第一个 .bufferCount(2, 1).map(arr => arr[0]).

This means that be need to delay emission by one before window (that's the first .bufferCount(2, 1).map(arr => arr[0]).

或者使用 publish() 自己控制订阅顺序可能更容易:

Or maybe it's easier to control the order of subscriptions myself with publish():

const str = 'a-a-a-a-a-b-b-b-b-c-c-c-c-d-d-d-e';
const source = Observable.from(str.split('-'), Rx.Scheduler.async).share();

const connectable = source.publish();

connectable
    .window(source
        .bufferCount(2, 1) // keep the previous and current item
        .filter(([oldValue, newValue]) => oldValue !== newValue)
    )
    .concatMap(obs => obs.toArray())
    .subscribe(console.log);

connectable.connect();

输出是一样的.

这篇关于RxJs 将流拆分为多个流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 23:10