本文介绍了Rx.Subject 丢失事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

谁能解释一下这 3 个变体之间的区别?

Can anybody explain what the differents between these 3 variants?

http://jsfiddle.net/8vx2g3fr/2/

  1. 首先作为 expect 工作,所有事件都被处理.
  2. 但是第二个输掉了最后一个事件(3)
  3. 第三次输掉第二场比赛 (2)

你能帮我理解问题是什么以及如何让第三个变体处理所有事件吗?

Could you please help me to understand what the issue is and how to make the third variant process all events?

let bs = new Rx.Subject();
bs
    .subscribe(v=>{
        console.log("in", v);
        if (v % 2 == 0) {
            setTimeout(()=>{
                console.log(" out", v, "->" , v + 1);
                bs.next(v+1);
            }, 0);
        }
    });

bs.next(0);
bs.next(2);

输出:

in 0
in 2
 out 0 -> 1
in 1
 out 2 -> 3
in 3

2

let bs2 = new Rx.Subject();
bs2
    .subscribe(v=>{
        console.log("in", v);
        if (v % 2 == 0) {
            Rx.Observable.interval(0).take(1)
                .map(()=>{console.log(" out", v, "->" , v + 1);return v+1;})
                .subscribe(bs2);
        }
    });

bs2.next(0);
bs2.next(2);

输出:

in 0
in 2
 out 0 -> 1
in 1
 out 2 -> 3

3

let bs3 = new Rx.Subject();
bs3
    .switchMap(v=>{
        console.log("in", v);
        if (v % 2 == 0) {
            return Rx.Observable.interval(0).take(1)
                .map(()=>{console.log(" out", v, "->" , v + 1);return v+1;});
        }

    return Rx.Observable.empty();
    }).subscribe(bs3);

bs3.next(0);
bs3.next(2);

输出:

in 0
in 2
 out 2 -> 3
in 3

推荐答案

这实际上都是预期的行为.

This all is in fact expected behavior.

令人困惑的是,当您多次重复使用 Subject 和诸如 take() 之类的运算符时会发生什么.

The confusing thing is what happens when you reuse Subjectand an operator such as take() multiple times.

Operator take(1) 只需要一个值并发送 complete 通知.由于 .subscribe(bs2),此通知由 Subject 接收.现在是最重要的部分.
Subject 收到 completeerror 通知时,它会将自己标记为已停止.这意味着它永远不会发送任何项目或通知,这是 Rx 中正确和预期的行为.通知 completeerror 必须是最后一次发送.

Operator take(1) takes just a single value and send complete notification. This notification is received by the Subject because of .subscribe(bs2). Now comes the most important part.
When a Subject receives a complete or error notification it marks itself as stopped. This means it will never remit any items or notifications which is correct and expected behavior in Rx. Notifications complete or error have to be the last emissions.

所以 Subject 由第一个 take(1) 完成,它由值 0 触发(bs2.next(0) 调用).

So the Subject is completed by the first take(1) which is triggered by value 0 (the bs2.next(0) call).

然后当值 2 触发 Observable.interval(0).take(1) 的第二次运行时,它被 Subject 接收但它会被自动忽略,因为 Subject 已经被标记为已停止.

Then when value 2 triggers the second run of the Observable.interval(0).take(1) it's received by the Subject but it's automatically ignored because the Subject is already marked as stopped.

第三个演示中的过程完全相同.

The process in you third demo is exactly the same.

你可以在Subject.ts的源码中看到:

You can see it in the source code in Subject.ts:

https://github.com/ReactiveX/rxjs/blob/master/src/Subject.ts#L56

这篇关于Rx.Subject 丢失事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-18 12:24