在以下代码中,如果我正确理解了RX中的联接,则应该看到发生以下警报:
西方
测试
测试西*
完成
我得到了我期望的4条警报中的3条...为什么我也没有收到“ Test-West”?
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
var loginInitial = new LoginInitial();
var loginCheckList = new LoginCheckList();
var result1 = from x in loginInitial.Status
from y in loginCheckList.Status
where x == "Test" && y == "West"
select new { x, y };
result1.Subscribe(x => MessageBox.Show(x.x + "-" + x.y));
var result2 = from x in loginInitial.Status
where x == "Test"
select x;
result2.Subscribe(x => MessageBox.Show(x));
var result3 = from x in loginCheckList.Status
where x == "West"
select x;
result3.Subscribe(x => MessageBox.Show(x));
var task1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10000000; i++)
{
if (i == 9000000)
loginInitial.Status.Publish("9000000");
if (i == 9000001)
loginInitial.Status.Publish("Test");
}
});
var task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 1000000; i++)
{
if (i == 800000)
loginInitial.Status.Publish("800000");
if (i == 800001)
loginCheckList.Status.Publish("West");
}
});
Task.WaitAll(task1, task2);
MessageBox.Show("Done");
}
}
public class LoginInitial
{
public PublishObservable<string> Status = new PublishObservable<string>();
}
public class LoginCheckList
{
public PublishObservable<string> Status = new PublishObservable<string>();
}
public class PublishObservable<T> : IObservable<T>
{
private IList<IObserver<T>> _observers = new List<IObserver<T>>();
public void Publish(T value)
{
lock (_observers)
{
foreach (var observer in _observers)
{
observer.OnNext(value);
}
}
}
public void Complete()
{
lock (_observers)
{
foreach (var observer in _observers)
{
observer.OnCompleted();
}
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
lock (_observers)
{
_observers.Add(observer);
}
return null;
}
}
最佳答案
Tomas Petricek几乎可以解释为什么会这样。我将以一个解决方案为例。
除了调整result1以使用CombineLatest
(还需要使用扩展方法语法而不是linq语法)之外,我还将实现更改为使用Subject
,这将消除创建自己的。我还通过IObservable
观察到的合并geach结果,将使用多个订阅的实现更改为单个订阅。
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
var loginInitial = new Subject<String>();
var loginCheckList = new Subject<String>();
var result1 = loginInitial.CombineLatest(loginCheckList,
(x, y) => new Tuple<string, string>(x, y))
.Where(latest => latest.Item1 == "Test" && latest.Item2 == "West")
.Select(latest => string.Format("{0} - {1}", latest.Item1, latest.Item2));
var result2 = from x in loginInitial
where x == "Test"
select x;
var result3 = from x in loginCheckList
where x == "West"
select x;
Observable.Merge(result1, result2, result3)
.Subscribe(Console.WriteLine);
var task1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10000000; i++)
{
if (i == 9000000)
loginInitial.OnNext("9000000");
if (i == 9000001)
loginInitial.OnNext("Test");
}
});
var task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 1000000; i++)
{
if (i == 800000)
loginInitial.OnNext("800000");
if (i == 800001)
loginCheckList.OnNext("West");
}
});
Task.WaitAll(task1, task2);
Console.WriteLine("Done");
}
}
注意1-我在这里使用了
Observable.Merge
,但是您也可以根据需要将其更改为使用CombineLatest
。请查看RxAs页面上Zip和CombineLatest的大理石图,以更好地了解它们的行为。注2-我可能会更改result2和result3以使用扩展方法语法,以便在一个方法中不会混合使用多种方法。它的方式没什么问题,但我更希望在可能的情况下使用一种语法的一致性。
关于c# - 在2个IObservable上的响应式(Reactive)linq表达式,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/5162308/