我有以下方法:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null);
}
它实现以下逻辑:
通过从名称空间管理器(
IObservable<NamespaceConnectionInfo>
)获取GetNamespaceConnectionInfoSource
来输入monad。当名称空间可用时,获取与特定名称空间(
IObservable<DataManagementPolicy>
)相对应的GetPolicySourceForNamespace
。但是,使用Merge
运算符可以限制同时调用GetPolicySourceForNamespace
的次数。筛选出错误的
DataManagementPolicy
记录(无法在SQL中完成)。将看似良好的
DataManagementPolicy
记录转换为DataManagementWorkItem
实例。有些可能显示为null
,因此最后将其过滤掉。GetNamespaceConnectionInfoSource
在产生一定数量的有效NamespaceConnectionInfo
对象之后可能会出错。到那时,完全有可能已经按照最终的可观察顺序生成了一定数量的DataManagementWorkItem
对象。我有一个单元测试,其中:
GetNamespaceConnectionInfoSource
在产生25个名称空间之后抛出GetPolicySourceForNamespace
每个名称空间产生10个对象并发限制为10
我也有兴趣检查最终可观察结果产生的项目,然后再进行故障处理:
var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
AssertTheRightException(exc);
}
workItems
集合每次都有不同数量的项目。一次运行有69项,另一项-50项,另一项-18项。我的解释是,当故障发生时,在处理的各个阶段中都有良好的
NamespaceConnectionInfo
和DataManagementPolicy
对象,所有这些对象都会由于故障而中止。每次的数量都是不同的,因为物料是异步生产的。我的问题就在这里-我不希望他们被中止。我希望它们能够完成,以最终的可观察顺序生产,然后才传达故障。本质上,我想保留异常并在最后将其重新抛出。
我尝试对实现进行一些修改:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
Exception fault = null;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault = exc;
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Finally(() =>
{
if (fault != null)
{
throw fault;
}
});
}
不用说-它没有用。
Finally
似乎没有传播我实际上同意的任何异常。那么,什么是实现我想要的正确方法?
编辑
与该问题无关,我发现用于收集生成的
DataManagementWorkItem
实例的测试代码是错误的。代替 var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
它应该是
await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));
区别在于,后者仅订阅了一次商品来源,而原始版本只订阅了两次:
由
Subscribe
由
await
它不会影响问题,但会破坏我的模拟代码。
澄清度
这更多的是澄清。每个名称空间产生一个由10个策略对象组成的序列。但是此过程是异步的-策略对象是顺序生成的,但是是异步生成的。在所有时间期间,将继续生成名称空间,因此,在发生故障之前给定25个名称空间,所生成的名称空间可以是三种可能的“状态”:
尚未为其生成任何策略对象,但是异步策略生成过程已经开始
已经产生了一些(但少于10个)策略对象
命名空间的所有10个策略对象均已生成
当名称空间产生中发生错误时,无论当前处于“良好”名称空间的“状态”如何,整个管道都将中止。
让我们看一下以下简单示例:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace observables
{
class Program
{
static void Main()
{
int count = 0;
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10);
var items = new HashSet<long>();
try
{
obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
}
catch (Exception exc)
{
Debug.WriteLine(exc.Message);
}
Debug.WriteLine(items.Count);
}
}
}
运行它时,通常会有以下输出:
Boom!
192
但是,它也可能显示191。但是,如果我们应用故障连接解决方案(即使在没有故障的情况下它也不起作用):
int count = 0;
var fault = new Subject<long>();
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Catch<long, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<long>();
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10)
.Concat(fault);
然后,输出始终为240,因为我们让所有已经开始的异步过程完成了。
基于pmccloghrylaing的答案的尴尬解决方案
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
var fault = new Subject<DataManagementWorkItem>();
bool faulted = false;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faulted = true;
return Observable.Throw<NamespaceConnectionInfo>(exc);
})
.Finally(() =>
{
if (!faulted)
{
fault.OnCompleted();
}
})
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Concat(fault);
}
它既可以在名称空间产生故障时也可以在成功时起作用,但是看起来很尴尬。另外,多个订阅仍然共享故障。必须有一个更优雅的解决方案。
GetNamespaceConnectionInfoSource源代码
public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
IList<SqlParameter> parameters;
var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}
public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
return Observable.FromAsync(async () =>
{
SqlCommand command = null;
try
{
var conn = await GetConnectionAsync();
command = GetCommand(conn, query, parameters);
return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
}
finally
{
DisposeSilently(command);
}
});
}
public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}
private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
try
{
if (IsPrimitiveDataType(objectType))
{
while (await reader.ReadAsync(ct))
{
obs.OnNext(reader[0]);
}
}
else
{
// Get all the properties in our Object
var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);
// For each property get the data from the reader to the object
while (await reader.ReadAsync(ct))
{
obs.OnNext(typeReflector.DataRecordConstructor == null ?
ReadNextObject(typeReflector, reader) :
typeReflector.DataRecordConstructor(reader));
}
}
}
catch (OperationCanceledException)
{
}
finally
{
if (disposeReader)
{
reader.Dispose();
}
}
}
最佳答案
调用m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
返回一个IObservable<NamespaceConnectionInfo>
。现在,任何可观察的合约是这样的:
OnNext*(OnError|OnCompleted)
这意味着您将得到零个或多个值,然后是一个错误或一个完成值,然后只有一个。
您无法从单个可观察对象中获得多个错误,并且在获得错误后也无法获取值。
如果您的可观察对象确实返回了多个错误,则表明它违反了正常的接收合同。
因此,鉴于此,给定现有代码,您不可能将错误延迟到可观察对象的结尾,因为错误是可观察对象的结尾。
您可以做的是更改在
GetNamespaceConnectionInfoSource
中生成值的方式,以使其在将它们合并回一个之前生成多个序列调用.Materialize()
。这意味着您将只有一个IObservable<Notification<NamespaceConnectionInfo>>
,并且在整个流中可能会有多个错误和补全。然后,可以在处理错误之前对该流进行分组并处理值。但这全都取决于对GetNamespaceConnectionInfoSource
的更改,并且由于您尚未发布源代码,因此我无法为您提供正确的代码。为了帮助理解这一点,请看以下代码:
var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();
xs
.Select(x =>
{
if (x == 0)
throw new NotSupportedException();
else
return x;
})
.Subscribe(
x => Console.WriteLine(x),
ex => Console.WriteLine(ex.ToString()));
它产生此:
1
2
3
System.NotSupportedException: Specified method is not supported.
at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)
值
4
和5
根本不会产生。现在看这段代码:
xs
.Select(x =>
Observable
.Start(() =>
{
if (x == 0)
throw new NotSupportedException();
else
return x;
})
.Materialize())
.Merge()
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Subscribe(
x => Console.WriteLine(String.Format(
"{0} {1}",
x.Kind,
x.HasValue ? x.Value.ToString() : "")),
ex => Console.WriteLine(ex.ToString()));
这将产生以下结果:
OnNext 1
OnNext 4
OnError
OnError
OnNext 5
OnNext 3
OnNext 2
由于引入了并行性,因此出现了故障。
但是现在您可以处理所有错误。
关于c# - 如何在IObservable管道中保留异常并在最后将其重新抛出?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31735887/