我正在创建一个游戏,其中有一个可观察的事件流 X,代表制造商交付的产品。还有一些外部事件(我们称之为变形金刚)以各种方式和不同时间段影响制造的性能。我想用其他 observable 来表示它,它发出一个函数来转换 X,并且应该应用于每个 X,直到 Transformer 的 OnComplete。变形金刚的数量是未知的——它们是由于用户操作(如设备购买)或随机生成(如设备故障)而创建的。

我想我需要一个 IObservable<IObservable<Func<X,X>>>,我必须使用 JoinZip ( IObservable<X> ,其他什么?)与 Observable.CombineLatest 一起执行此操作。你能帮我解决这个问题吗? IEnumerable<IObservable<T>> 几乎是我需要的,但它需要一个 List<List<T>>

如果我的描述不清楚,这里有一个大理石图:

用更抽象的术语来说,我需要的非常类似于矩阵的转置,但不是 IObservable<IObservable<T>> 我有 ojit_code 。

最佳答案

哇,那真是令人费解,但我想我有一些可行的方法。首先,我创建了一个扩展方法来将 IObservable<IObservable<Func<T, T>> 转换为 IObservable<IEnumerable<Func<T, T>> 。扩展方法的运行假设每个 observable 在完成之前只会产生一个 Func<T, T>

public static class MoreReactiveExtensions
{
    public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
    {
        return
            Observable
            // Yield an empty enumerable first.
            .Repeat(Enumerable.Empty<Func<T, T>>(), 1)
            // Then yield an updated enumerable every time one of
            // the transformation observables yields a value or completes.
            .Concat(
                source
                .SelectMany((x, i) =>
                    x
                    .Materialize()
                    .Select(y => new
                        {
                            Id = i,
                            Notification = y
                        }))
                .Scan(
                    new List<Tuple<int, Func<T, T>>>(),
                    (acc, x) =>
                    {
                        switch(x.Notification.Kind)
                        {
                            // If an observable compeleted then remove
                            // its corresponding function from the accumulator.
                            case NotificationKind.OnCompleted:
                                acc =
                                    acc
                                    .Where(y => y.Item1 != x.Id)
                                    .ToList();
                                break;
                            // If an observable yield a new Func then add
                            // it to the accumulator.
                            case NotificationKind.OnNext:
                                acc = new List<Tuple<int, Func<T, T>>>(acc)
                                    {
                                        Tuple.Create(x.Id, x.Notification.Value)
                                    };
                                break;
                            // Do something with exceptions here.
                            default:
                                // Do something here
                                break;
                        }
                        return acc;
                    })
                // Select an IEnumerable<Func<T, T>> here.
                .Select(x => x.Select(y => y.Item2)));
    }
}

然后,给定以下变量:
IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`

我这样使用它:
var transformations =
    transformationObservables
    .ToTransformations()
    .Publish()
    .RefCount();

IObservable<int> transformedProducts=
    transformations
    .Join(
        products,
        t => transformations,
        i => Observable.Empty<int>(),
        (t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))

根据我的测试,结果似乎是正确的。

关于system.reactive - 如果其他 Observable 发出映射函数,则转换 Observable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29881665/

10-11 02:58