我这里有一个简单的程序,可以显示不同单词中字母的数量。它按预期工作。

static void Main(string[] args) {
    var word = new Subject<string>();
    var wordPub = word.Publish().RefCount();
    var length = word.Select(i => i.Length);
    var report =
        wordPub
        .GroupJoin(length,
            s => wordPub,
            s => Observable.Empty<int>(),
            (w, a) => new { Word = w, Lengths = a })
        .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
    word.OnNext("Apple");
    word.OnNext("Banana");
    word.OnNext("Cat");
    word.OnNext("Donkey");
    word.OnNext("Elephant");
    word.OnNext("Zebra");
    Console.ReadLine();
}

输出为:
Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5

我使用Publish()。RefCount()是因为“wordpub”两次包含在“report”中。没有它,当发出一个单词时,报告的第一部分将被回调通知,然后报告的另一部分将被通知,将通知加倍。那是会发生的事情。输出最终有11个项目而不是6个项目。至少我认为这是正在发生的事情。我认为在这种情况下使用Publish()。RefCount()可以同时更新报表的两个部分。

但是,如果我将length函数更改为ALSO,则使用这样的已发布源:
var length = wordPub.Select(i => i.Length);

那么输出是这样的:
Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5

长度函数为什么也不能使用相同的发布源?

最佳答案

这是一个巨大的挑战!
如此微妙的条件使这种情况发生了。
抱歉,需要长解释,但请您多多包涵!

TL; DR

对已发布源的订阅将按顺序进行处理,但是在直接进行未发布源的任何其他订阅之前。也就是说,您可以跳过队列!
使用GroupJoin订阅顺序对于确定何时打开和关闭窗口很重要。

我首先要担心的是,您正在发表一个主题的书。
这应该是无人操作。Subject<T>没有订阅费用。

因此,当您删除Publish().RefCount()时:

var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);

那么你会遇到同样的问题。

因此,然后我看一下GroupJoin(因为我的直觉表明Publish().Refcount()是一个红色鲱鱼)。
对我来说,仅凭眼球就很难合理化,因此我也依靠一种简单的调试方法,多年来我已经使用了数十次-TraceLog扩展方法。
public interface ILogger
{
    void Log(string input);
}
public class DumpLogger : ILogger
{
    public void Log(string input)
    {
        //LinqPad `Dump()` extension method.
        //  Could use Console.Write instead.
        input.Dump();
    }
}


public static class ObservableLoggingExtensions
{
    private static int _index = 0;

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
    {
        return Observable.Create<T>(o =>
        {
            var index = Interlocked.Increment(ref _index);
            var label = $"{index:0000}{name}";
            logger.Log($"{label}.Subscribe()");
            var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
            var subscription = source
                .Do(
                    x => logger.Log($"{label}.OnNext({x.ToString()})"),
                    ex => logger.Log($"{label}.OnError({ex})"),
                    () => logger.Log($"{label}.OnCompleted()")
                )
                .Subscribe(o);

            return new CompositeDisposable(subscription, disposed);
        });
    }
}

当我将日志记录添加到您提供的代码中时,它看起来像这样:
var logger = new DumpLogger();

var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
    wordPub.Log(logger, "lhs")
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
        s => wordPub.Log(logger, "lhsDuration"),
        s => Observable.Empty<int>().Log(logger, "rhsDuration"),
        (w, a) => new { Word = w, Lengths = a })
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");

然后,这将在我的日志中输出类似以下内容的内容

使用Publish()。RefCount()进行日志
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6
...

但是,当我删除用法Publish().RefCount()时,新的日志输出如下:

仅记录主题
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Apple 6

    OnNext
    Banana 6

0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...

这给了我们一些见解,但是,当问题真正变得明显时,便是我们开始使用订阅的逻辑列表来注释日志。

在带有RefCount的原始(工作)代码中,我们的注释可能看起来像这样
//word.Subsribers.Add(wordPub)

0001lhs.Subscribe()             //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe()             //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6

因此,在此示例中,当执行word.OnNext("Banana");时,观察者链按此顺序链接
  • wordPub
  • 0002rhs

  • 但是,wordPub有子订阅!
    所以真正的订阅列表看起来像
  • wordPub
  • 0001lhs
  • 0003lhsDuration
  • 0005lhsDuration
  • 0002rhs

  • 如果仅注释主题日志,那么我们将看到微妙之处在哪里
    0001lhs.Subscribe()                 //word.Subsribers.Add(0001lhs)
    0002rhs.Subscribe()                 //word.Subsribers.Add(0002rhs)
    0001lhs.OnNext(Apple)
    0003lhsDuration.Subscribe()         //word.Subsribers.Add(0003lhsDuration)
    0002rhs.OnNext(5)
    0004rhsDuration.Subscribe()
    0004rhsDuration.OnCompleted()
    0004rhsDuration.Dispose()
    
        OnNext
        Apple 5
    
    0001lhs.OnNext(Banana)
    0005lhsDuration.Subscribe()         //word.Subsribers.Add(0005lhsDuration)
    0002rhs.OnNext(6)
    0006rhsDuration.Subscribe()
    0006rhsDuration.OnCompleted()
    0006rhsDuration.Dispose()
    
        OnNext
        Apple 6
    
        OnNext
        Banana 6
    
    0003lhsDuration.OnNext(Banana)
    0003lhsDuration.Dispose()
    

    因此,在此示例中,当执行word.OnNext("Banana");时,观察者链按此顺序链接
    1. 0001lhs
    2. 0002rhs
    3. 0003lhsDuration
    4. 0005lhsDuration
    

    由于0003lhsDuration订阅在0002rhs之后被激活,因此它将看不到“香蕉”值来终止窗口,直到将rhs发送给该值之后,才在仍然打开的窗口中产生该值。

    ew

    正如@ francezu13k50指出的那样,解决您的问题的简单明了的解决方案是只使用word.Select(x => new { Word = x, Length = x.Length });,但是正如我认为您已经为我们提供了实际问题的简化版本(赞赏),我知道为什么这不合适。
    但是,由于我不知道您真正的问题所在是什么,因此我不确定向您提供什么解决方案的建议,除了您的当前代码中有一个之外,现在您应该知道它为什么以这种方式工作。

    关于system.reactive - 对Publish()。Refcount()的行为感到困惑,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38367304/

    10-09 22:06