GetNamespaceConnectionInfoSource

GetNamespaceConnectionInfoSource

我有以下方法:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null);
}


它实现以下逻辑:


通过从名称空间管理器(IObservable<NamespaceConnectionInfo>)获取GetNamespaceConnectionInfoSource来输入monad。
当名称空间可用时,获取与特定名称空间(IObservable<DataManagementPolicy>)相对应的GetPolicySourceForNamespace。但是,使用Merge运算符可以限制同时调用GetPolicySourceForNamespace的次数。
筛选出错误的DataManagementPolicy记录(无法在SQL中完成)。
将看似良好的DataManagementPolicy记录转换为DataManagementWorkItem实例。有些可能显示为null,因此最后将其过滤掉。


GetNamespaceConnectionInfoSource在产生一定数量的有效NamespaceConnectionInfo对象之后可能会出错。到那时,完全有可能已经按照最终的可观察顺序生成了一定数量的DataManagementWorkItem对象。

我有一个单元测试,其中:


GetNamespaceConnectionInfoSource在产生25个名称空间之后抛出
GetPolicySourceForNamespace每个名称空间产生10个对象
并发限制为10


我也有兴趣检查最终可观察结果产生的项目,然后再进行故障处理:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
}


workItems集合每次都有不同数量的项目。一次运行有69项,另一项-50项,另一项-18项。

我的解释是,当故障发生时,在处理的各个阶段中都有良好的NamespaceConnectionInfoDataManagementPolicy对象,所有这些对象都会由于故障而中止。每次的数量都是不同的,因为物料是异步生产的。

我的问题就在这里-我不希望他们被中止。我希望它们能够完成,以最终的可观察顺序生产,然后才传达故障。本质上,我想保留异常并在最后将其重新抛出。

我尝试对实现进行一些修改:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    Exception fault = null;
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Catch<NamespaceConnectionInfo, Exception>(exc =>
        {
            fault = exc;
            return Observable.Empty<NamespaceConnectionInfo>();
        })
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null)
        .Finally(() =>
        {
            if (fault != null)
            {
                throw fault;
            }
        });
}


不用说-它没有用。 Finally似乎没有传播我实际上同意的任何异常。

那么,什么是实现我想要的正确方法?

编辑

与该问题无关,我发现用于收集生成的DataManagementWorkItem实例的测试代码是错误的。代替

    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;


它应该是

    await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));


区别在于,后者仅订阅了一次商品来源,而原始版本只订阅了两次:


Subscribe
await


它不会影响问题,但会破坏我的模拟代码。

澄清度

这更多的是澄清。每个名称空间产生一个由10个策略对象组成的序列。但是此过程是异步的-策略对象是顺序生成的,但是是异步生成的。在所有时间期间,将继续生成名称空间,因此,在发生故障之前给定25个名称空间,所生成的名称空间可以是三种可能的“状态”:


尚未为其生成任何策略对象,但是异步策略生成过程已经开始
已经产生了一些(但少于10个)策略对象
命名空间的所有10个策略对象均已生成


当名称空间产生中发生错误时,无论当前处于“良好”名称空间的“状态”如何,整个管道都将中止。

让我们看一下以下简单示例:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace observables
{
    class Program
    {
        static void Main()
        {
            int count = 0;
            var obs = Observable
                .Interval(TimeSpan.FromMilliseconds(1))
                .Take(50)
                .Select(i =>
                {
                    if (25 == Interlocked.Increment(ref count))
                    {
                        throw new Exception("Boom!");
                    }
                    return i;
                })
                .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
                .Merge(10);

            var items = new HashSet<long>();
            try
            {
                obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
            }
            catch (Exception exc)
            {
                Debug.WriteLine(exc.Message);
            }
            Debug.WriteLine(items.Count);
        }
    }
}


运行它时,通常会有以下输出:

Boom!
192


但是,它也可能显示191。但是,如果我们应用故障连接解决方​​案(即使在没有故障的情况下它也不起作用):

        int count = 0;
        var fault = new Subject<long>();
        var obs = Observable
            .Interval(TimeSpan.FromMilliseconds(1))
            .Take(50)
            .Select(i =>
            {
                if (25 == Interlocked.Increment(ref count))
                {
                    throw new Exception("Boom!");
                }
                return i;
            })
            .Catch<long, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<long>();
            })
            .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
            .Merge(10)
            .Concat(fault);


然后,输出始终为240,因为我们让所有已经开始的异步过程完成了。

基于pmccloghrylaing的答案的尴尬解决方案

    public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
    {
        var fault = new Subject<DataManagementWorkItem>();
        bool faulted = false;
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faulted = true;
                return Observable.Throw<NamespaceConnectionInfo>(exc);
            })
            .Finally(() =>
            {
                if (!faulted)
                {
                    fault.OnCompleted();
                }
            })
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(fault);
    }


它既可以在名称空间产生故障时也可以在成功时起作用,但是看起来很尴尬。另外,多个订阅仍然共享故障。必须有一个更优雅的解决方案。

GetNamespaceConnectionInfoSource源代码

public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
    bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
    IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
    IList<SqlParameter> parameters;
    var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
        isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
    var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
    return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}

public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    return Observable.FromAsync(async () =>
    {
        SqlCommand command = null;
        try
        {
            var conn = await GetConnectionAsync();
            command = GetCommand(conn, query, parameters);
            return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
        }
        finally
        {
            DisposeSilently(command);
        }
    });
}

public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
    return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}

private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
    try
    {
        if (IsPrimitiveDataType(objectType))
        {
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(reader[0]);
            }
        }
        else
        {
            // Get all the properties in our Object
            var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);

            // For each property get the data from the reader to the object
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(typeReflector.DataRecordConstructor == null ?
                    ReadNextObject(typeReflector, reader) :
                    typeReflector.DataRecordConstructor(reader));
            }
        }
    }
    catch (OperationCanceledException)
    {
    }
    finally
    {
        if (disposeReader)
        {
            reader.Dispose();
        }
    }
}

最佳答案

调用m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)返回一个IObservable<NamespaceConnectionInfo>。现在,任何可观察的合约是这样的:

OnNext*(OnError|OnCompleted)


这意味着您将得到零个或多个值,然后是一个错误或一个完成值,然后只有一个。

您无法从单个可观察对象中获得多个错误,并且在获得错误后也无法获取值。

如果您的可观察对象确实返回了多个错误,则表明它违反了正常的接收合同。

因此,鉴于此,给定现有代码,您不可能将错误延迟到可观察对象的结尾,因为错误是可观察对象的结尾。

您可以做的是更改在GetNamespaceConnectionInfoSource中生成值的方式,以使其在将它们合并回一个之前生成多个序列调用.Materialize()。这意味着您将只有一个IObservable<Notification<NamespaceConnectionInfo>>,并且在整个流中可能会有多个错误和补全。然后,可以在处理错误之前对该流进行分组并处理值。但这全都取决于对GetNamespaceConnectionInfoSource的更改,并且由于您尚未发布源代码,因此我无法为您提供正确的代码。

为了帮助理解这一点,请看以下代码:

var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();

xs
    .Select(x =>
    {
        if (x == 0)
            throw new NotSupportedException();
        else
            return x;
    })
    .Subscribe(
        x => Console.WriteLine(x),
        ex => Console.WriteLine(ex.ToString()));


它产生此:

1
2
3
System.NotSupportedException: Specified method is not supported.
   at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
   at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)


45根本不会产生。

现在看这段代码:

xs
    .Select(x =>
        Observable
            .Start(() =>
            {
                if (x == 0)
                    throw new NotSupportedException();
                else
                    return x;
            })
            .Materialize())
    .Merge()
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Subscribe(
        x => Console.WriteLine(String.Format(
            "{0} {1}",
            x.Kind,
            x.HasValue ? x.Value.ToString() : "")),
        ex => Console.WriteLine(ex.ToString()));


这将产生以下结果:

OnNext 1
OnNext 4
OnError
OnError
OnNext 5
OnNext 3
OnNext 2


由于引入了并行性,因此出现了故障。

但是现在您可以处理所有错误。

关于c# - 如何在IObservable管道中保留异常并在最后将其重新抛出?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31735887/

10-11 02:19