我有一个RIA Services数据服务,该服务具有以下几个函数调用:

public InvokeOperation<T> SomeFunc(
    SomeData data,
    Action<InvokeOperation<T>> callback,
    object userState)


我将如何与Reactive Extensions一起使用,以便我可以订阅回调并获得InvokeOperation结果?



更新:这是我当前对Enigmativity混合解决方案的实现。由于InvokeOperation UserState可能很有价值,因此我不仅需要实际的InvokeOperation值。应当指出,我根本没有测试错误处理。

public static class ObservableEx
{
      public static IObservable<InvokeOperation<T>> ObservableInvokeOperation<T, Tdat> (
         Func<Tdat, Action<InvokeOperation<T>>, object, InvokeOperation<T>> func,
         Tdat data,
         System.Reactive.Concurrency.IScheduler scheduler )
      {
         return
             Observable.Defer<InvokeOperation<T>>( () =>
                 FromCallbackPattern<Tdat, T>( func, scheduler )
                     .Invoke( data ) );
      }

      private static Func<P, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T> (
          Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
          IScheduler scheduler )
      {
         return p =>
         {
            var subject = new AsyncSubject<InvokeOperation<T>>();
            try
            {
               call( p, iot =>
               {
                  if ( iot.HasError )
                  {
                     subject.OnError( iot.Error );
                  }
                  else
                  {
                     subject.OnNext( iot );
                     subject.OnCompleted();
                  }
               }, p );
            }
            catch ( Exception ex )
            {
               subject.OnError( ex );
            }
            return subject.ObserveOn( scheduler );
         };
      }
}


使用给定功能

public InvokeOperation<int> SomeFunc(SomeData data, Action<InvokeOperation<int>> callback, object userState)

var myobs = ObservableEx.ObservableInvokeOperation<int, SomeData>( myRIAContext.SomeFunc, data, Scheduler.ThreadPool );


这对于与给定功能签名匹配的任何功能都非常有用。不幸的是,现在我遇到了一些变化,例如

Func<T1, Action<InvokeOperation<T>>, object>
Func<T1, T2, Action<InvokeOperation<T>>, object>


任何人都建议将其转换为能够处理我要抛出的任何InvokeOperation方法?

最佳答案

编辑1:请参阅下面的基于保罗·贝茨的答案和我的混合解决方案。

EDIT2:有关基于OP的更新的“第三代”解决方案,请参见下文。



回调有点难处理,我必须说,将其变为可观察的是一个不错的方法。

我有一种对我有用的方法。

基本方法是将SomeFunc操作转换为Func<T>,然后在其上调用Observable.Start。我将其包装在Observable.Create中以保持其清洁,并添加了错误处理。我已经完成了基本测试,但没有什么功能强大。

消耗代码如下所示:

var obs = service.SomeObservableFunc(new SomeData(), Scheduler.ThreadPool);
obs.Subscribe(t => { /* success */ }, ex => { /* error */ });


我假设您的RIA服务类是RiaService<T>,并构建了SomeObservableFunc扩展方法,如下所示:

    public static IObservable<T> SomeObservableFunc<T>(
        this RiaService<T> service,
        SomeData data,
        IScheduler scheduler)
    {
        return Observable.Create<T>(o =>
        {
            var error = (Exception)null;
            Func<T> call = () =>
            {
                var result = default(T);
                var mre = new ManualResetEvent(false);
                Action<InvokeOperation<T>> callback = iot =>
                {
                    try
                    {
                        if (iot.HasError)
                        {
                            error = iot.Error;
                        }
                        else
                        {
                            result = iot.Value;
                        }
                    }
                    catch (Exception ex)
                    {
                        error = ex;
                    }
                    finally
                    {
                        mre.Set();
                    }
                };
                try
                {
                    service.SomeFunc(data, callback, null);
                    mre.WaitOne();
                }
                catch (Exception ex)
                {
                    error = ex;
                }
                return result;
            };

            return Observable
                .Start(call, scheduler)
                .Subscribe(t =>
                {
                    try
                    {
                        if (error == null)
                        {
                            o.OnNext(t);
                        }
                        else
                        {
                            o.OnError(error);
                        }
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }
                }, ex => o.OnError(ex), () =>
                {
                    if (error == null)
                    {
                        o.OnCompleted();
                    }
                });
        });
    }


喊出来,如果这对你有用。



编辑1

我喜欢Paul Betts的解决方案,因为它不使用ManualResetEvent,但是它没有编译,也没有处理RIA服务调用期间可能发生的内部错误,因此我创建了Follow混合解决方案。

我的扩展方法现在看起来像这样:

    public static IObservable<T> SomeObservableFunc<T>(
        this RiaService<T> service,
        SomeData data,
        IScheduler scheduler)
    {
        return
            Observable.Defer<T>(() =>
                FromCallbackPattern<SomeData, T>(service.SomeFunc, scheduler)
                    .Invoke(data));
    }


这使用了最初由Paul Betts创建的重新制作的FromCallbackPattern

    private static Func<P, IObservable<T>> FromCallbackPattern<P, T>(
        Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return p =>
        {
            var subject = new AsyncSubject<T>();
            try
            {
                call(p, iot =>
                {
                    if (iot.HasError)
                    {
                        subject.OnError(iot.Error);
                    }
                    else
                    {
                        subject.OnNext(iot.Value);
                        subject.OnCompleted();
                    }
                }, null);
            }
            catch (Exception ex)
            {
                subject.OnError(ex);
            }
            return subject.ObserveOn(scheduler);
        };
    }


它适用于我的测试代码,我认为这是一个更好的整体解决方案。



编辑2

此版本的解决方案旨在允许将不同数量的参数以及用户状态传递给FromCallbackPattern扩展方法。

我从这种通用的FromCallbackPattern扩展方法开始:

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
        this Action<Action<InvokeOperation<T>>> call,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
        {
            var subject = new AsyncSubject<InvokeOperation<T>>();
            try
            {
                call(iot =>
                {
                    subject.OnNext(iot);
                    subject.OnCompleted();
                });
            }
            catch (Exception ex)
            {
                subject.OnError(ex);
            }
            return subject.ObserveOn(scheduler);
        });
    }


然后,我需要一系列私有的Reduce扩展方法,以减少对Action<Action<InvokeOperation<T>>>委托的各种服务调用:

    private static Action<Action<InvokeOperation<T>>> Reduce<T>(
        this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        object state)
    {
        return a => call(a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P, T>(
        this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P p, object state)
    {
        return a => call(p, a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, T>(
        this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P1 p1, P2 p2, object state)
    {
        return a => call(p1, p2, a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, T>(
        this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P1 p1, P2 p2, P3 p3, object state)
    {
        return a => call(p1, p2, p3, a, state);
    }

    private static Action<Action<InvokeOperation<T>>> Reduce<P1, P2, P3, P4, T>(
        this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        P1 p1, P2 p2, P3 p3, P4 p4, object state)
    {
        return a => call(p1, p2, p3, p4, a, state);
    }


现在,我可以编写普通的FromCallbackPattern扩展方法:

    public static Func<object, IObservable<InvokeOperation<T>>> FromCallbackPattern<T>(
        this Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return o => call.Reduce(o).FromCallbackPattern(scheduler);
    }

    public static Func<P, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P, T>(
        this Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p, o) => call.Reduce(p, o).FromCallbackPattern(scheduler);
    }

    public static Func<P1, P2, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, T>(
        this Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p1, p2, o) => call.Reduce(p1, p2, o).FromCallbackPattern(scheduler);
    }

    public static Func<P1, P2, P3, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, T>(
        this Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p1, p2, p3, o) => call.Reduce(p1, p2, p3, o).FromCallbackPattern(scheduler);
    }

    public static Func<P1, P2, P3, P4, object, IObservable<InvokeOperation<T>>> FromCallbackPattern<P1, P2, P3, P4, T>(
        this Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>> call,
        IScheduler scheduler)
    {
        return (p1, p2, p3, p4, o) => call.Reduce(p1, p2, p3, p4, o).FromCallbackPattern(scheduler);
    }


然后,最后是原始的SomeObservableFunc / ObservableInvokeOperation扩展方法(现在也重命名为FromCallbackPattern):

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P p, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p, state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P1, P2, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P1 p1, P2 p2, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p1, p2, state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P1, P2, P3, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P1 p1, P2 p2, P3 p3, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p1, p2, p3, state));
    }

    public static IObservable<InvokeOperation<T>> FromCallbackPattern<P1, P2, P3, P4, T>(
        this RiaService<T> service,
        Func<RiaService<T>, Func<P1, P2, P3, P4, Action<InvokeOperation<T>>, object, InvokeOperation<T>>> call,
        P1 p1, P2 p2, P3 p3, P4 p4, object state,
        IScheduler scheduler)
    {
        return Observable.Defer(() =>
            call(service).FromCallbackPattern(scheduler)
                .Invoke(p1, p2, p3, p4, state));
    }


显然,您需要用RIA服务类类型替换对RiaService<T>的引用。

这些方法可以这样调用:

IObservable<InvokeOperation<int>> obs1 =
    service
        .FromCallbackPattern(
            s => s.SomeFunc,
            new SomeData(),
            null, // user state
            Scheduler.ThreadPool);

IObservable<InvokeOperation<int>> obs2 =
    service
        .FromCallbackPattern(
            s => s.SomeOtherFunc,
            42, "Hello", 3.14159265,
            null, // user state
            Scheduler.ThreadPool);


!现在怎么样

关于system.reactive - 如何将带有InvokeOperation回调的此函数转换为Reactive Extensions?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7802135/

10-10 04:11