前几天我一直在尝试编写一个 Rx 查询来处理来自源的事件流并检查某些 ID 的缺失。缺席被定义为有一系列时间窗口(例如,从 9:00 到 17:00 的所有日子),在此期间最多应该有 20 分钟没有 ID 出现在流中。更复杂的是,应根据 ID 定义缺勤时间。
例如,假设三种事件 A、B 和 C 出现在组合的事件流(A、A、B、C、A、C、B 等)中,可以定义为
我想我需要首先通过 GroupBy 将流划分为单独的事件,然后使用缺席规则处理生成的单独流。我已经在 Microsoft Rx forums 上仔细考虑了这一点(非常感谢 Dave)并且我有一些工作代码来生成规则和进行缺席检查,但是我很挣扎,例如,如何将它与分组结合起来。
所以,没有进一步的发言,到目前为止被黑的代码:
//Some sample data bits representing the events.
public class FakeData
{
public int Id { get; set; }
public string SomeData { get; set; }
}
//Note the Now part in DateTime to zero the clock time and have only the date. The purpose is to create start-end pairs of times, e.g. 9:00-17:00.
//The alarm start and end time points should match themselves pairwise, could be pairs of values...
var maxDate = DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14);
var startDate = DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0);
var alarmStartPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d))).ToList();
var alarmEndPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList();
以及在不将它们分组的情况下进行缺勤检查的查询,这是我的症结之一。 也许我应该将时间点分组并添加一个 ID 并在查询中使用生成的三元组...
dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100))
select new FakeData
{
Id = new Random().Next(1, 5),
SomeData = DateTimeOffset.Now.ToString()
};
var startPointOfTimeChanges = alarmStartPeriods.ToObservable();
var endPointOfTimeChanges = alarmEndPeriods.ToObservable();
var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end });
var maximumInactivityTimeBeforeAlarmSignal = TimeSpan.FromMilliseconds(250);
timer = (from duration in durations
select (from _ in Observable.Timer(DateTime.Now)
from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end)
select x)).Switch();
timer.Subscribe(x => Debug.WriteLine(x.SomeData));
问题:
我能想到的其他问题会很好(自娱自乐:)):
我能想到的其他选项是明确使用
System.Threading.Timers
和 ConcurrentDictionary
,但需要继续学习!关于詹姆斯的输入答案,这里有一个快速解释器,它是如何工作的以及我打算如何使用它。
首先,observable 在第一个事件到来之前什么都不做。因此,如果监控应该立即开始,则需要添加其他一些 Rx 功能或触发一个虚拟事件。没问题,我相信。
其次,对于任何新的 ID,都会从 alarmInterval 获取一个新的超时变量。这里的新人甚至是一个已经缺席太久并触发警报的人。
我认为这很有效,因为人们可以订阅这个 observable 并做一些有副作用的事情。一些例子就像设置一个标志,发送一个信号以及什么业务规则有一个。此外,保持适当的锁定等,应该很容易根据预定义的警报规则提供新的时间跨度,并具有单独的缺勤期和时间窗口。
我将不得不研究与此相关的其他概念,以便更好地掌握事物。但我的主要担忧对此感到满意。生活很好,很好。 :-)
最佳答案
已编辑 - 改进了代码,简化了 SelectMany
以使用 TakeLast
。
我写了一个关于检测断开连接的客户端的 blog post - 如果您将帖子中的 timeToHold 变量替换为下面的 alarmInterval 之类的函数以根据客户端 ID 获取 throttle 时间跨度,那么它也适用于您的场景。
例如。:
// idStream is an IObservable<int> of the input stream of IDs
// alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID
var idAlarmStream = idStream
.GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.TakeLast(1));
这为您提供了持续监控的基本功能,而无需查看事件监控时段。
为了获得监视器窗口功能,我会扭转局面并使用 WHERE 过滤上述输出,以检查发出的 ID 是否落在它的监视时间窗口内。这使得更容易处理不断变化的监控周期。
您可以通过将每个监控窗口变成一个流并将它们与警报流相结合来做一些更有趣的事情,但我不相信额外复杂性的好处。
alarmInterval 函数还将为您提供动态警报间隔的元素,因为它可以返回新值,但这些仅在该 ID 的警报关闭后才会生效,从而结束其当前组。
--- 开始这里的一些理论---
为了让这个完全充满活力,你必须以某种方式结束小组——你可以通过几种方式来做到这一点。
一种方法是使用 Select 将 idStream 投影到包含 ID 和全局计数器值的自定义类型的流中。给这个类型一个适当的相等实现,这样它就可以正确地与 GroupByUntil 一起工作。
现在,每次更改警报间隔时,都要更改计数器。这将导致为每个 ID 创建新组。然后,您可以在最终过滤器中添加额外的检查,以确保输出事件具有最新的计数器值。
关于c# - 如何在某些时间段内对流进行分区(GroupBy)并监视 Rx 中元素的缺失?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/19313360/