我有两个Subject
,一个是带有ID的人对象流,另一个是表示谁与谁成为朋友的ID的外部参照流。这是一些简化的代码。
class Program {
static void Main() {
// Set up observables
var people = new Subject<Person>();
var friendMap = new Subject<FriendMap>();
var friendNotices = from p1 in people
from p2 in people
from pair in friendMap
where p1.Id == pair.Id1 && p2.Id == pair.Id2
select p1.Name + " befriended " + p2.Name;
// Subscribe to log
friendNotices.Subscribe(Console.WriteLine);
// Add people
people.OnNext(new Person(1, "Alice"));
people.OnNext(new Person(2, "Bob"));
// Add relationships
friendMap.OnNext(new FriendMap(1, 2)); // => "Alice befriended Bob"
friendMap.OnNext(new FriendMap(2, 1)); // Doesn't show up!
}
}
class Person {
public Person(int id, string name) {
Id = id;
Name = name;
}
public string Name;
public int Id;
}
class FriendMap {
public FriendMap(int id1, int id2) {
Id1 = id1;
Id2 = id2;
}
public int Id1;
public int Id2;
}
我遇到的问题是有时添加外部参照不会导致
friendNotice
事件。特别是,如果在具有Id1的人之前创建具有Id2的人,这似乎会失败。这是Rx中的错误还是我的代码中的错误?无论哪种方式,我如何才能使它正常工作?
(在我的应用程序中,“成为朋友”是不可交换的-爱丽丝与鲍勃成为好友的关系与鲍勃与艾丽斯成为好友的关系不同,因此在我的案例中“仅交换ID并重试”不是一个可用的解决方案)。
最佳答案
这里的问题是对SelectMany函数的工作方式有误解(这是linq理解from ... from ...
映射到的运算符)。
此构造从源流中获取每个元素,并将其投影到目标流上。它通过在目标流上为源的每个元素创建订阅来为投影提供服务。
让我们使用一些伪代码来检查一下。仅考虑查询的这一部分:
from p1 in people
from p2 in people
目前,让我们将人员流分为人员A和人员B:
from p1 in peopleA
from p2 in peopleB
如果我们现在致电:
peopleA.OnNext(Alice);
实际发生的情况是,将在peopleB上创建一个新的订阅,以服务于Alice在peopleB上的投影。此时,peopleB中没有任何元素-因此不会发生投影。
现在,如果我们调用:
peopleB.OnNext(Tom);
来自Alice-> peopleB的投影将运行,并且将输出(Alice,Tom)。
现在致电:
peopleB.OnNext(Dick);
现在,从Alice-> peopleB的投影继续进行,因此将输出(Alice,Dick)。
现在致电:
peopleA.OnNext(Bob);
现在,新的订阅已在Bob的peopleB上开始-但在peopleB发出之前不会输出任何内容。
现在致电:
peopleB.OnNext(Harry);
通过同时运行Alice和Bob订阅,我们将获得(Alice,Harry)和(Bob,Harry);
一个简单的示例供您尝试:
var source = new Subject<string>();
var source2 = new Subject<string>();
var res = from s in source
from t in source2
select s + " " + t;
res.Subscribe(Console.WriteLine);
source.OnNext("A");
source2.OnNext("1");
source2.OnNext("2");
source.OnNext("B");
source2.OnNext("3");
将给出输出:
A 1
A 2
A 3
B 3
回到“自我SelectMany”。现在,一切开始变得有些棘手。关键部分是正在设置的订阅不会捕获触发其设置的“当前”项目。因此,让我们标记SelectMany A和B的每个部分:
from p1 in people (call this A)
from p2 in people (call this B)
当我们打电话给:
people.OnNext(Alice);
在A上对Alice的订阅将在B上进行-但由于在Alice发出后进行了订阅,所以它不会被她捕获,也不会输出任何内容。
现在我们调用:
people.OnNext(Bob);
对Alice的订阅将在B上看到Bob,这将导致输出(Alice,Bob)。将在B上创建Bob在A上的订阅,但是由于错过了Bob的输出,因此不会再输出任何内容。
这就是您所看到的。发出的唯一组合是(爱丽丝,鲍勃)。
您可以通过在设置新订阅时让人员主题重播其内容来解决此问题。像这样修改样本的第一部分:
// Set up observables
var people = new Subject<Person>();
var peopleR = people.Replay().RefCount();
var friendMap = new Subject<FriendMap>();
var friendNotices = from p1 in peopleR
from p2 in peopleR
from pair in friendMap
where p1.Id == pair.Id1 && p2.Id == pair.Id2
select p1.Name + " befriended " + p2.Name;
...但是如果流长期运行,这是不可取的,因为您正在将所有内容都缓存在内存中。
就您的特定问题而言,我认为您最好采用其他方法-但是在不知道您要实现的目标的情况下很难进行说明,这已经是很长的篇幅!一种方法是对FriendMap条目进行简单的订阅,然后查找朋友的姓名以输出所需的消息。 (我可能在更大的范围内缺少某些东西!)。
希望您能通过自己的方法了解问题所在。
关于c# - 当C#Reactive Framework中的源流和目标流相同时,SelectMany如何工作?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/16978592/