在Node应用程序中,我试图使用RxJS处理事件流。事件流是许多文档的更改列表。我正在使用groupBy通过documentId将流划分为新的流。但是我想知道,一旦在客户端上关闭了文档,并且没有为该documentId的流添加新事件,一旦文档流为空,groupBy会处置该流吗?如果没有,我将如何手动执行?我想避免由于创建但从未销毁新文档流而导致的内存泄漏。

最佳答案

我建议做的是:

不仅可以观察documentChanges,还可以观察documentEvents。

客户端在打开文档时将发送documentOpened事件,在更改文档时将发送documentChanged事件,在关闭文档时将发送documentClosed事件。

通过通过同一可观察对象发送所有3种类型的事件,您可以建立并保证顺序。如果客户端按该顺序发送documentOpened,documentChanged,documentClosed事件,则服务器将按该顺序查看它们。请注意,对于2个不同客户端发送的事件的顺序,我们将不做任何保证。这只是让您确保特定客户端发送的事件将是正确的。

然后,这就是您使用groupByUntil的方式:

documentEvents
    .groupByUntil(
        function (e) { return e.documentId; }, // key
        null, // element
        function (group) { // duration selector
            var documentId = group.key;
            return group.filter(function (e) { return e.eventType === 'documentClosed'; });
      })
    .flatMap(function (eventsForDocument) {
        var documentId = eventsForDocument.key;
        return eventsForDocument.whatever(...);
    })
    .subscribe(...);


另一个简单得多的选择:您可以在空闲时间后使组过期。根据您对事件进行的处理,这可能绰绰有余。如果该文档在5分钟内没有被编辑,则该示例会使组失效。如果要进行更多编辑,则会旋转一个新组。

var idleTime = 5 * 60 * 1000;
events
    .groupByUntil(
        function(e) { return e.documentId; },
        null,
        function(g) { return g.debounce(idleTime); })
    .flatMap...

10-06 03:00