我有两个Subject,一个是带有ID的人对象流,另一个是表示谁与谁成为朋友的ID的外部参照流。这是一些简化的代码。

class Program {
    static void Main() {
        // Set up observables
        var people = new Subject<Person>();
        var friendMap = new Subject<FriendMap>();
        var friendNotices = from p1 in people
                            from p2 in people
                            from pair in friendMap
                            where p1.Id == pair.Id1 && p2.Id == pair.Id2
                            select p1.Name + " befriended " + p2.Name;

        // Subscribe to log
        friendNotices.Subscribe(Console.WriteLine);

        // Add people
        people.OnNext(new Person(1, "Alice"));
        people.OnNext(new Person(2, "Bob"));

        // Add relationships
        friendMap.OnNext(new FriendMap(1, 2)); // => "Alice befriended Bob"
        friendMap.OnNext(new FriendMap(2, 1)); // Doesn't show up!
    }
}

class Person {
    public Person(int id, string name) {
        Id = id;
        Name = name;
    }
    public string Name;
    public int Id;
}

class FriendMap {
    public FriendMap(int id1, int id2) {
        Id1 = id1;
        Id2 = id2;
    }
    public int Id1;
    public int Id2;
}


我遇到的问题是有时添加外部参照不会导致friendNotice事件。特别是,如果在具有Id1的人之前创建具有Id2的人,这似乎会失败。

这是Rx中的错误还是我的代码中的错误?无论哪种方式,我如何才能使它正常工作?

(在我的应用程序中,“成为朋友”是不可交换的-爱丽丝与鲍勃成为好友的关系与鲍勃与艾丽斯成为好友的关系不同,因此在我的案例中“仅交换ID并重试”不是一个可用的解决方案)。

最佳答案

这里的问题是对SelectMany函数的工作方式有误解(这是linq理解from ... from ...映射到的运算符)。

此构造从源流中获取每个元素,并将其投影到目标流上。它通过在目标流上为源的每个元素创建订阅来为投影提供服务。

让我们使用一些伪代码来检查一下。仅考虑查询的这一部分:

from p1 in people
from p2 in people


目前,让我们将人员流分为人员A和人员B:

from p1 in peopleA
from p2 in peopleB


如果我们现在致电:

peopleA.OnNext(Alice);


实际发生的情况是,将在peopleB上创建一个新的订阅,以服务于Alice在peopleB上的投影。此时,peopleB中没有任何元素-因此不会发生投影。

现在,如果我们调用:

peopleB.OnNext(Tom);


来自Alice-> peopleB的投影将运行,并且将输出(Alice,Tom)。

现在致电:

peopleB.OnNext(Dick);


现在,从Alice-> peopleB的投影继续进行,因此将输出(Alice,Dick)。

现在致电:

peopleA.OnNext(Bob);


现在,新的订阅已在Bob的peopleB上开始-但在peopleB发出之前不会输出任何内容。

现在致电:

peopleB.OnNext(Harry);


通过同时运行Alice和Bob订阅,我们将获得(Alice,Harry)和(Bob,Harry);

一个简单的示例供您尝试:

var source = new Subject<string>();
var source2 = new Subject<string>();

var res = from s in source
          from t in source2
          select s + " " + t;

res.Subscribe(Console.WriteLine);

source.OnNext("A");
source2.OnNext("1");
source2.OnNext("2");
source.OnNext("B");
source2.OnNext("3");


将给出输出:

A 1
A 2
A 3
B 3


回到“自我SelectMany”。现在,一切开始变得有些棘手。关键部分是正在设置的订阅不会捕获触发其设置的“当前”项目。因此,让我们标记SelectMany A和B的每个部分:

from p1 in people (call this A)
from p2 in people (call this B)


当我们打电话给:

people.OnNext(Alice);


在A上对Alice的订阅将在B上进行-但由于在Alice发出后进行了订阅,所以它不会被她捕获,也不会输出任何内容。

现在我们调用:

people.OnNext(Bob);


对Alice的订阅将在B上看到Bob,这将导致输出(Alice,Bob)。将在B上创建Bob在A上的订阅,但是由于错过了Bob的输出,因此不会再输出任何内容。

这就是您所看到的。发出的唯一组合是(爱丽丝,鲍勃)。

您可以通过在设置新订阅时让人员主题重播其内容来解决此问题。像这样修改样本的第一部分:

    // Set up observables
    var people = new Subject<Person>();
    var peopleR = people.Replay().RefCount();
    var friendMap = new Subject<FriendMap>();
    var friendNotices = from p1 in peopleR
                        from p2 in peopleR
                        from pair in friendMap
                        where p1.Id == pair.Id1 && p2.Id == pair.Id2
                        select p1.Name + " befriended " + p2.Name;


...但是如果流长期运行,这是不可取的,因为您正在将所有内容都缓存在内存中。

就您的特定问题而言,我认为您最好采用其他方法-但是在不知道您要实现的目标的情况下很难进行说明,这已经是很长的篇幅!一种方法是对FriendMap条目进行简单的订阅,然后查找朋友的姓名以输出所需的消息。 (我可能在更大的范围内缺少某些东西!)。

希望您能通过自己的方法了解问题所在。

关于c# - 当C#Reactive Framework中的源流和目标流相同时,SelectMany如何工作?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/16978592/

10-17 00:03