我有一个RIA Services数据服务,该服务具有以下几个函数调用:
public InvokeOperation<T> SomeFunc(
SomeData data,
Action<InvokeOperation<T>> callback,
object userState)
我将如何与Reactive Extensions一起使用,以便我可以订阅回调并获得InvokeOperation结果?
更新:这是我当前对Enigmativity混合解决方案的实现。由于InvokeOperation UserState可能很有价值,因此我不仅需要实际的InvokeOperation值。应当指出,我根本没有测试错误处理。
public static class ObservableEx
{
public static IObservable<InvokeOperation<T>> ObservableInvokeOperation<T, Tdat> (
Func<Tdat, Action<InvokeOperation<T>>, object, InvokeOperation<T>> func,
Tdat data,
System.Reactive.Concurrency.IScheduler scheduler )
{
return
Observable.Defer<InvokeOperation<T>>( () =>
FromCallbackPattern<Tdat, T>( func, scheduler )
.Invoke( data ) );
}
private static Func<P, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T> (
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler )
{
return p =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call( p, iot =>
{
if ( iot.HasError )
{
subject.OnError( iot.Error );
}
else
{
subject.OnNext( iot );
subject.OnCompleted();
}
}, p );
}
catch ( Exception ex )
{
subject.OnError( ex );
}
return subject.ObserveOn( scheduler );
};
}
}
使用给定功能
public InvokeOperation<int> SomeFunc(SomeData data, Action<InvokeOperation<int>> callback, object userState)
var myobs = ObservableEx.ObservableInvokeOperation<int, SomeData>( myRIAContext.SomeFunc, data, Scheduler.ThreadPool );
这对于与给定功能签名匹配的任何功能都非常有用。不幸的是,现在我遇到了一些变化,例如
Func<T1, Action<InvokeOperation<T>>, object>
Func<T1, T2, Action<InvokeOperation<T>>, object>
任何人都建议将其转换为能够处理我要抛出的任何InvokeOperation方法?
最佳答案
编辑1:请参阅下面的基于保罗·贝茨的答案和我的混合解决方案。
EDIT2:有关基于OP的更新的“第三代”解决方案,请参见下文。
回调有点难处理,我必须说,将其变为可观察的是一个不错的方法。
我有一种对我有用的方法。
基本方法是将SomeFunc
操作转换为Func<T>
,然后在其上调用Observable.Start
。我将其包装在Observable.Create
中以保持其清洁,并添加了错误处理。我已经完成了基本测试,但没有什么功能强大。
消耗代码如下所示:
var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });
我假设您的RIA服务类是
RiaService<T>
,并构建了SomeObservableFunc
扩展方法,如下所示: public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var error = (Exception)null;
Func<T> call = () =>
{
var result = default(T);
var mre = new ManualResetEvent(false);
Action<InvokeOperation<T>> callback = iot =>
{
try
{
if (iot.HasError)
{
error = iot.Error;
}
else
{
result = iot.Value;
}
}
catch (Exception ex)
{
error = ex;
}
finally
{
mre.Set();
}
};
try
{
service.SomeFunc(data, callback, null);
mre.WaitOne();
}
catch (Exception ex)
{
error = ex;
}
return result;
};
return Observable
.Start(call, scheduler)
.Subscribe(t =>
{
try
{
if (error == null)
{
o.OnNext(t);
}
else
{
o.OnError(error);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
}, ex => o.OnError(ex), () =>
{
if (error == null)
{
o.OnCompleted();
}
});
});
}
喊出来,如果这对你有用。
编辑1
我喜欢Paul Betts的解决方案,因为它不使用
ManualResetEvent
,但是它没有编译,也没有处理RIA服务调用期间可能发生的内部错误,因此我创建了Follow混合解决方案。我的扩展方法现在看起来像这样:
public static IObservable<T> SomeObservableFunc<T>(
this RiaService<T> service,
SomeData data,
IScheduler scheduler)
{
return
Observable.Defer<T>(() =>
FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
.Invoke(data));
}
这使用了最初由Paul Betts创建的重新制作的
FromCallbackPattern
: private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return p =>
{
var subject = new AsyncSubject<T>();
try
{
call(p, iot =>
{
if (iot.HasError)
{
subject.OnError(iot.Error);
}
else
{
subject.OnNext(iot.Value);
subject.OnCompleted();
}
}, null);
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
};
}
它适用于我的测试代码,我认为这是一个更好的整体解决方案。
编辑2
此版本的解决方案旨在允许将不同数量的参数以及用户状态传递给
FromCallbackPattern
扩展方法。我从这种通用的
FromCallbackPattern
扩展方法开始: public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this Action<Action<InvokeOperation<T>>> call,
IScheduler scheduler)
{
return Observable.Defer(() =>
{
var subject = new AsyncSubject<InvokeOperation<T>>();
try
{
call(iot =>
{
subject.OnNext(iot);
subject.OnCompleted();
});
}
catch (Exception ex)
{
subject.OnError(ex);
}
return subject.ObserveOn(scheduler);
});
}
然后,我需要一系列私有的
Reduce
扩展方法,以减少对Action<Action<InvokeOperation<T>>>
委托的各种服务调用: private static Action<Action<InvokeOperation<T>>> Reduce<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
object state)
{
return a => call(a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P p, object state)
{
return a => call(p, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, object state)
{
return a => call(p1, p2, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, object state)
{
return a => call(p1, p2, p3, a, state);
}
private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state)
{
return a => call(p1, p2, p3, p4, a, state);
}
现在,我可以编写普通的
FromCallbackPattern
扩展方法: public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return o => call.Reduce(o).FromCallbackPattern(scheduler);
}
public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
}
public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
IScheduler scheduler)
{
return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
}
然后,最后是原始的
SomeObservableFunc
/ ObservableInvokeOperation
扩展方法(现在也重命名为FromCallbackPattern
): public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
this RiaService<T> service,
Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P p, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, state));
}
public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
this RiaService<T> service,
Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
P1 p1, P2 p2, P3 p3, P4 p4, object state,
IScheduler scheduler)
{
return Observable.Defer(() =>
call(service).FromCallbackPattern(scheduler)
.Invoke(p1, p2, p3, p4, state));
}
显然,您需要用RIA服务类类型替换对
RiaService<T>
的引用。这些方法可以这样调用:
IObservable<InvokeOperation<int>> obs1 =
service
.FromCallbackPattern(
s => s.SomeFunc,
new SomeData(),
null, // user state
Scheduler.ThreadPool);
IObservable<InvokeOperation<int>> obs2 =
service
.FromCallbackPattern(
s => s.SomeOtherFunc,
42, "Hello", 3.14159265,
null, // user state
Scheduler.ThreadPool);
!现在怎么样
关于system.reactive - 如何将带有InvokeOperation回调的此函数转换为Reactive Extensions?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7802135/