使用 Rx,我想观察一个遗留对象,它同时公开了 GetItems 方法和事件 NewItem

GetItems 被调用时,它会同步返回缓存中所有项目的列表。它还将生成一个异步获取项目,这些项目将在收到时通过 NewItem 事件发布。

如何通过制定 LINQ 查询以捕获两个结果集,以一致的方式观察这两个源(同步 + 异步)?生产顺序并不重要。

最佳答案

让我看看我是否理解您的遗留对象。我假设它是一个看起来像这样的泛型类型:

public class LegacyObject<T>
{
    public IEnumerable<T> GetItems();
    public event EventHandler<NewItemEventArgs<T>> NewItem;
}

使用这样的新项目事件参数:
public class NewItemEventArgs<T> : System.EventArgs
{
    public T NewItem { get; private set; }
    public NewItemEventArgs(T newItem)
    {
        this.NewItem = newItem;
    }
}

现在,我为 .ToObservable() 创建了一个 LegacyObject<T> 扩展方法:
public static IObservable<T> ToObservable<T>(
    this LegacyObject<T> @this)
{
    return Observable.Create<T>(o =>
    {
        var gate = new object();
        lock (gate)
        {
            var list = new List<T>();
            var subject = new Subject<T>();
            var newItems = Observable
                .FromEventPattern<NewItemEventArgs<T>>(
                    h => @this.NewItem += h,
                    h => @this.NewItem -= h)
                .Select(ep => ep.EventArgs.NewItem);
            var inner = newItems.Subscribe(ni =>
            {
                lock (gate)
                {
                    if (!list.Contains(ni))
                    {
                        list.Add(ni);
                        subject.OnNext(ni);
                    }
                }
            });
            list.AddRange(@this.GetItems());
            var outer = list.ToArray().ToObservable()
                .Concat(subject).Subscribe(o);
            return new CompositeDisposable(inner, outer);
        }
    });
}

此方法为每个订阅者创建一个新的 observable - 这是编写此类扩展方法时正确的做法。

它创建一个 gate 对象来锁定对内部列表的访问。

因为您说调用 GetItems 的行为会产生异步函数以获取新项目,所以我确保在调用 NewItem 之前创建了 GetItems 订阅。
inner 订阅检查新项目是否在列表中,如果它不在列表中,则只对主题调用 OnNext

调用 GetItems 并将值通过 AddRange 添加到内部列表中。

NewItem 事件开始在另一个线程上触发之前,项目不会被添加到列表中的可能性不大,但也是可能的。这就是为什么对列表的访问存在锁定的原因。 inner 订阅将在尝试将项目添加到列表之前等待它获得锁定,这将在初始项目添加到列表后发生。

最后将内部列表变成一个 observable,与主题连接,o 观察者订阅这个 observable。

使用 IDisposable 将两个订阅作为单个 CompositeDisposable 返回。

这就是 ToObservable 方法。

现在,我通过在遗留对象上创建一个构造函数来测试这一点,该构造函数让我同时传入可枚举和可观察的值。当调用 GetItems 并且 observable 驱动 NewItem 事件时返回可枚举。

所以,我的测试代码如下所示:
var tester = new Subject<int>();
var legacy = new LegacyObject<int>(new [] { 1, 2, 3, }, tester);

var values = legacy.ToObservable();

values.Subscribe(v => Console.WriteLine(v));

tester.OnNext(3);
tester.OnNext(4);
tester.OnNext(4);
tester.OnNext(5);

写入控制台的值是:
1
2
3
4
5

让我知道这是否满足您的需求。

关于c# - 观察同步和异步结果,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7707458/

10-13 03:15