我正在创建一个游戏,其中有一个可观察的事件流 X,代表制造商交付的产品。还有一些外部事件(我们称之为变形金刚)以各种方式和不同时间段影响制造的性能。我想用其他 observable 来表示它,它发出一个函数来转换 X,并且应该应用于每个 X,直到 Transformer 的 OnComplete。变形金刚的数量是未知的——它们是由于用户操作(如设备购买)或随机生成(如设备故障)而创建的。
我想我需要一个 IObservable<IObservable<Func<X,X>>>
,我必须使用 Join
将 Zip
( IObservable<X>
,其他什么?)与 Observable.CombineLatest
一起执行此操作。你能帮我解决这个问题吗? IEnumerable<IObservable<T>>
几乎是我需要的,但它需要一个 List<List<T>>
。
如果我的描述不清楚,这里有一个大理石图:
用更抽象的术语来说,我需要的非常类似于矩阵的转置,但不是 IObservable<IObservable<T>>
我有 ojit_code 。
最佳答案
哇,那真是令人费解,但我想我有一些可行的方法。首先,我创建了一个扩展方法来将 IObservable<IObservable<Func<T, T>>
转换为 IObservable<IEnumerable<Func<T, T>>
。扩展方法的运行假设每个 observable 在完成之前只会产生一个 Func<T, T>
。
public static class MoreReactiveExtensions
{
public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
{
return
Observable
// Yield an empty enumerable first.
.Repeat(Enumerable.Empty<Func<T, T>>(), 1)
// Then yield an updated enumerable every time one of
// the transformation observables yields a value or completes.
.Concat(
source
.SelectMany((x, i) =>
x
.Materialize()
.Select(y => new
{
Id = i,
Notification = y
}))
.Scan(
new List<Tuple<int, Func<T, T>>>(),
(acc, x) =>
{
switch(x.Notification.Kind)
{
// If an observable compeleted then remove
// its corresponding function from the accumulator.
case NotificationKind.OnCompleted:
acc =
acc
.Where(y => y.Item1 != x.Id)
.ToList();
break;
// If an observable yield a new Func then add
// it to the accumulator.
case NotificationKind.OnNext:
acc = new List<Tuple<int, Func<T, T>>>(acc)
{
Tuple.Create(x.Id, x.Notification.Value)
};
break;
// Do something with exceptions here.
default:
// Do something here
break;
}
return acc;
})
// Select an IEnumerable<Func<T, T>> here.
.Select(x => x.Select(y => y.Item2)));
}
}
然后,给定以下变量:
IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`
我这样使用它:
var transformations =
transformationObservables
.ToTransformations()
.Publish()
.RefCount();
IObservable<int> transformedProducts=
transformations
.Join(
products,
t => transformations,
i => Observable.Empty<int>(),
(t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))
根据我的测试,结果似乎是正确的。
关于system.reactive - 如果其他 Observable 发出映射函数,则转换 Observable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/29881665/