问题描述
我有许多MongoDB集合,它们从各种流媒体源中获取大量JSON文档。换句话说,有许多进程不断地将数据插入一组MongoDB集合中。
I have a number of MongoDB collections which take a number of JSON documents from various streaming sources. In other-words there a a number of processes which are continually inserting data into a set of MongoDB collections.
我需要一种方法将数据从MongoDB流式传输到下游应用程序。所以我想要一个概念上看起来像这样的系统:
I need a way to stream the data out of MongoDB into downstream applications. So I want an system that conceptually looks like this:
App Stream1 -->
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->
或者这个:
App Stream1 --> ---> MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
问题是我如何从Mongo流式传输数据而不必不断地轮询/查询数据库?
The question is how do I stream data out of Mongo without having to continually poll/query the database?
显而易见的问题答案是为什么不改变那些应用程序流程,将消息发送到像Rabbit,Zero或ActiveMQ这样的队列,然后让它们发送到你的Mongo Streaming进程和Mongo一次就像这样:
The obvious question answer would be "why dont you change those app streaming processes to send messages to a Queue like Rabbit, Zero or ActiveMQ which then has them send to your Mongo Streaming processes and Mongo at once like this":
MONGODB
/|\
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
在一个理想的世界中是的,这将是好的,但我们需要Mongo来确保消息是首先保存,以避免重复,并确保所有ID都生成等.Mongo必须位于中间作为持久层。
In an ideal world yes that would be good, but we need Mongo to ensure that messages are saved first, to avoid duplicates and ensure that IDs are all generated etc. Mongo has to sit in the middle as the persistence layer.
那么我如何将消息从Mongo集合(不使用GridFS等)流式传输到这些下游应用程序中。基本的思想流派只是轮询新文档,并且每个收集的文档都通过向存储在数据库中的JSON文档添加另一个字段来更新它,就像存储处理时间戳的SQL表中的进程标志一样。即每1秒轮询一次处理的文件== null .... add processed = now()....更新文件。
So how do I stream messages out of a Mongo collection (not using GridFS etc) into these down stream apps. The basic school of thought has been to just poll for new documents and each document that is collected update it by adding another field to the JSON documents stored in the database, much like a process flag in a SQL table that stores a processed time stamp. I.e. every 1 second poll for documents where processed == null.... add processed = now().... update document.
是否有更整洁/更多计算有效的方法?
Is there a neater/more computationally efficient method?
FYI - 这些都是Java进程。
FYI - These are all Java processes.
干杯!
推荐答案
如果您要写封顶集合(或集合),您可以使用在流上推送新数据,或者在可以流出的消息队列中。但是,这不适用于非上限集合。
If you are writing to a capped collection (or collections), you can use a tailablecursor to push new data on the stream, or on a message queue from where it can be streamed out. However this will not work for a non-capped collection though.
这篇关于MongoDb实时(或接近实时)流式传输插入的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!