由于我在这里很冷,所以我多次订阅“分组”,为什么我不需要在这里发布?我希望它在运行时会带来不需要的结果,但令我惊讶的是,无论有没有发布,它都可以使用。这是为什么?
var subject = new List<string>
{
"test",
"test",
"hallo",
"test",
"hallo"
}.ToObservable();
subject
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
// I Would have expect that I need to use Publish like that
//subject
// .GroupBy(x => x)
// .SelectMany(grouped => grouped.Publish(sharedGroup =>
// sharedGroup.Scan(0, (count, _) => ++count)
// .Zip(sharedGroup, (count, chars) =>
// new { Chars = chars, Count = count })))
// .Subscribe(result => Console.WriteLine("You typed {0} {1} times",
// result.Chars, result.Count));
Console.ReadLine();
编辑
保罗已经注意到,由于我们两次订阅了潜在的冷观测,因此我们应该对序列进行两次检查。但是,我没有运气使这种效果可见。我试图插入调试行,但是例如,它仅打印一次“性能”。
var subject = new List<Func<string>>
{
() =>
{
Console.WriteLine("performing");
return "test";
},
() => "test",
() => "hallo",
() => "test",
() => "hallo"
}.ToObservable();
subject
.Select(x => x())
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
我想知道我们是否可以使这种效果可见,即我们正在处理感冒并且没有使用
Observable
。在另一步骤中,我想看看Publish()
如何使效果消失。编辑2
正如Paul所建议的,我创建了一个自定义的
Publish()
用于调试。但是,如果您在它的IObservable<string>
方法中设置一个断点,您会注意到它只会被命中一次。class Program
{
static void Main(string[] args)
{
var subject = new MyObservable();
subject
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
Console.ReadLine();
}
}
class MyObservable : IObservable<string>
{
public IDisposable Subscribe(IObserver<string> observer)
{
observer.OnNext("test");
observer.OnNext("test");
observer.OnNext("hallo");
observer.OnNext("test");
observer.OnNext("hallo");
return Disposable.Empty;
}
}
所以对我来说,这个问题仍然悬而未决。为什么在这个冷的
Subscribe()
上我不需要这里的Publish
? 最佳答案
您仅使用一次基于列表的源,因此不会在此处看到重复的订阅效果。回答问题的关键是以下观察:
从GroupBy本身流出的IGroupedObservable 对象本身就是伪装的主题。
在内部,GroupBy保留一个Dictionary >。每当有消息进入时,它就会使用相应的密钥发送到主题中。您正在两次订阅分组对象,这很安全,因为主题使生产者与消费者脱钩。