我这里有一个简单的程序,可以显示不同单词中字母的数量。它按预期工作。
static void Main(string[] args) {
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Console.ReadLine();
}
输出为:
Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5
我使用Publish()。RefCount()是因为“wordpub”两次包含在“report”中。没有它,当发出一个单词时,报告的第一部分将被回调通知,然后报告的另一部分将被通知,将通知加倍。那是会发生的事情。输出最终有11个项目而不是6个项目。至少我认为这是正在发生的事情。我认为在这种情况下使用Publish()。RefCount()可以同时更新报表的两个部分。
但是,如果我将length函数更改为ALSO,则使用这样的已发布源:
var length = wordPub.Select(i => i.Length);
那么输出是这样的:
Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5
长度函数为什么也不能使用相同的发布源?
最佳答案
这是一个巨大的挑战!
如此微妙的条件使这种情况发生了。
抱歉,需要长解释,但请您多多包涵!
TL; DR
对已发布源的订阅将按顺序进行处理,但是在直接进行未发布源的任何其他订阅之前。也就是说,您可以跳过队列!
使用GroupJoin
订阅顺序对于确定何时打开和关闭窗口很重要。
我首先要担心的是,您正在发表一个主题的书。
这应该是无人操作。Subject<T>
没有订阅费用。
因此,当您删除Publish().RefCount()
时:
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
那么你会遇到同样的问题。
因此,然后我看一下
GroupJoin
(因为我的直觉表明Publish().Refcount()
是一个红色鲱鱼)。对我来说,仅凭眼球就很难合理化,因此我也依靠一种简单的调试方法,多年来我已经使用了数十次-
Trace
或Log
扩展方法。public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
当我将日志记录添加到您提供的代码中时,它看起来像这样:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
然后,这将在我的日志中输出类似以下内容的内容
使用Publish()。RefCount()进行日志
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
但是,当我删除用法
Publish().RefCount()
时,新的日志输出如下:仅记录主题
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
这给了我们一些见解,但是,当问题真正变得明显时,便是我们开始使用订阅的逻辑列表来注释日志。
在带有RefCount的原始(工作)代码中,我们的注释可能看起来像这样
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
因此,在此示例中,当执行
word.OnNext("Banana");
时,观察者链按此顺序链接但是,wordPub有子订阅!
所以真正的订阅列表看起来像
如果仅注释主题日志,那么我们将看到微妙之处在哪里
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
因此,在此示例中,当执行
word.OnNext("Banana");
时,观察者链按此顺序链接1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
由于
0003lhsDuration
订阅在0002rhs
之后被激活,因此它将看不到“香蕉”值来终止窗口,直到将rhs发送给该值之后,才在仍然打开的窗口中产生该值。ew
正如@ francezu13k50指出的那样,解决您的问题的简单明了的解决方案是只使用
word.Select(x => new { Word = x, Length = x.Length });
,但是正如我认为您已经为我们提供了实际问题的简化版本(赞赏),我知道为什么这不合适。但是,由于我不知道您真正的问题所在是什么,因此我不确定向您提供什么解决方案的建议,除了您的当前代码中有一个之外,现在您应该知道它为什么以这种方式工作。
关于system.reactive - 对Publish()。Refcount()的行为感到困惑,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38367304/