在Node应用程序中,我试图使用RxJS处理事件流。事件流是许多文档的更改列表。我正在使用groupBy通过documentId将流划分为新的流。但是我想知道,一旦在客户端上关闭了文档,并且没有为该documentId的流添加新事件,一旦文档流为空,groupBy会处置该流吗?如果没有,我将如何手动执行?我想避免由于创建但从未销毁新文档流而导致的内存泄漏。
最佳答案
我建议做的是:
不仅可以观察documentChanges,还可以观察documentEvents。
客户端在打开文档时将发送documentOpened事件,在更改文档时将发送documentChanged事件,在关闭文档时将发送documentClosed事件。
通过通过同一可观察对象发送所有3种类型的事件,您可以建立并保证顺序。如果客户端按该顺序发送documentOpened,documentChanged,documentClosed事件,则服务器将按该顺序查看它们。请注意,对于2个不同客户端发送的事件的顺序,我们将不做任何保证。这只是让您确保特定客户端发送的事件将是正确的。
然后,这就是您使用groupByUntil
的方式:
documentEvents
.groupByUntil(
function (e) { return e.documentId; }, // key
null, // element
function (group) { // duration selector
var documentId = group.key;
return group.filter(function (e) { return e.eventType === 'documentClosed'; });
})
.flatMap(function (eventsForDocument) {
var documentId = eventsForDocument.key;
return eventsForDocument.whatever(...);
})
.subscribe(...);
另一个简单得多的选择:您可以在空闲时间后使组过期。根据您对事件进行的处理,这可能绰绰有余。如果该文档在5分钟内没有被编辑,则该示例会使组失效。如果要进行更多编辑,则会旋转一个新组。
var idleTime = 5 * 60 * 1000;
events
.groupByUntil(
function(e) { return e.documentId; },
null,
function(g) { return g.debounce(idleTime); })
.flatMap...