我对该应用程序的目标是创建监视数据库的逻辑,并在将文档添加到数据库时触发操作(例如发送电子邮件)。但是,由于在第一次填充数据库时可能无法启动此应用程序,因此如何手动创建一个ResumeToken来指向添加到集合中的第一个文档,因此在第一次运行时,我可以从头开始并遍历所有更改,直到结束为止。我知道我需要存储来自lastChangeStreamDocument的ResumeToken以便将来重新启动,但是我对“首次运行”方案感兴趣。尽管enumerator.Reset();是正确的选项,但它引发了一个异常,表明它不受支持。

我已遵循https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs中提供的测试,并已使用以下代码成功配置了变更流

mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");

var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

try
{
    var cursor = collection.Watch();
    var enumerator = cursor.ToEnumerable().GetEnumerator();

    enumerator.MoveNext();  //Blocks until a record is UPDATED in the database
    var lastChangeStreamDocument = enumerator.Current;
    enumerator.Dispose();
    //lastChangeStreamDocument.FullDocument.Should().Be(document);

}
catch( Exception ex)
{
    Logger.WriteException(ex);
}

但是,使用此代码,enumerator.MoveNext()行会阻塞,直到文档被更新为止,因此,我只能在设置变更流后才能获得对更新文档的引用。

我的想法是搜索local.oplog数据库,并将第一个文档的UUID插入到集合中,但是成功了,但是,我看不到将这种引用转换为ResumeToken对象的一种方法,我可以将其提供给观看方法。

更新:

ResumeToken似乎存储为Base64,其中包含时间戳,o._id ObjectID以及oplog条目中的ui UUID。我需要再遍历一下代码,但是从此源代码(https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp)看,ResumeTokens的格式不同。有了这些信息,希望我可以构建自己的ResumeToken来匹配数据库期望的格式。

更新#2:

经过更多研究,我在github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp偶然发现了mongo中的key_string解析代码。该文件包含CType的定义。我将Base64解码为字节数组,然后使用CType枚举定义,我对如何构建自己的ResumeToken有了更多的了解。

考虑以下示例:
更新文档后,我在ChangeStream上捕获了ResumeToken。
glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==

这解码为字节数组:
82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04

我已经解码为:
//Timestamp (of oplog entry??)
82    //CType::TimeStamp
5a 7d ce c8 00 00 00 01   //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
//  that integer converts to 0x5A7DCEC8

//Unknown Object
46    //CType::Object
64 5f 69 64     //Either expecting a 32b value or null terminated
00    //Null terminator or divider

//Document ID
64    //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc  //o._id value from oplog entry
00    //OID expecting null terminated

//UUID
5a    //CType::BinData
10    //Length (16b)
04    //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e  //UUID value from oplog entry
04    //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?

我现在遇到的问题是,如果我有一个集合的许多oplog条目,并且我使用oplog中第一个条目的ts,ui和o._id值来构建自己的ResumeToken(对未知的0x4664 5f69 6400块进行硬编码,结束的0x04字节,则服务器在设置collection.Watch时将其作为有效的ResumeToken接收,但是enumerator.moveNext()调用返回的文档始终返回第三个oplog条目,而不是第二个oplog条目!

我很紧张地在生产中依靠它,却不知道那个12Byte块的目的,也不知道为什么我要指向第三个而不是第二个条目。

更新#3:

那些有问题的字节块:
46 64 5f 69 64 00

0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL

以下字节块描述了受影响的文档的ObjectId或它的“_id”键。那么“d”字符的意义是什么?

最佳答案

在处理此问题时,我一直在使用其他信息来更新问题,并且现在我已设法将其全部拼凑起来,以便它可以正常工作。

以下是我创建的代码:

  • 在local.oplog集合
  • 中找到 namespace 的第一个条目
  • 从该oplog文档生成ResumeToken(因此我们在第二个条目上继续)
  • 用于测试这些功能的示例。

  • 希望这段代码对其他尝试这样做的人也会有所帮助。
    /// <summary>
    /// Locates the first document for the given namespace in the local.oplog collection
    /// </summary>
    /// <param name="docNamespace">Namespace to search for</param>
    /// <returns>First Document found in the local.oplog collection for the specified namespace</returns>
    internal static BsonDocument GetFirstDocumentFromOpLog(string docNamespace)
    {
        mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
        IMongoDatabase localDB = mongoClient.GetDatabase("local");
        var collection = localDB.GetCollection<BsonDocument>("oplog.rs");
    
        //Find the documents from the specified namespace (DatabaseName.CollectionName), that have an operation type of "insert" (The first entry to a collection must always be an insert)
        var filter = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>("{ $and: [ { 'ns': '" + docNamespace + "'}, { 'op': 'i'}] }");
    
        BsonDocument retDoc = null;
        try //to get the first document from the oplog entries
        {
            retDoc = collection.Find<BsonDocument>(filter).First();
        }
        catch(Exception ex) { /*Logger.WriteException(ex);*/ }
        return retDoc;
    }
    
    /// <summary>
    /// Takes a document from the OpLog and generates a ResumeToken
    /// </summary>
    /// <param name="firstDoc">BsonDocument from the local.oplog collection to base the ResumeToken on</param>
    /// <returns>A ResumeToken that can be provided to a collection watch (ChangeStream) that points to the firstDoc provided</returns>
    private static BsonDocument GetResumeTokenFromOpLogDoc(BsonDocument firstDoc)
    {
        List<byte> hexVal = new List<byte>(34);
    
        //Insert Timestamp of document
        hexVal.Add(0x82);   //TimeStamp Tag
        byte[] docTimeStampByteArr = BitConverter.GetBytes(firstDoc["ts"].AsBsonTimestamp.Timestamp); //Timestamp is an integer, so we need to reverse it
        if (BitConverter.IsLittleEndian) { Array.Reverse(docTimeStampByteArr); }
        hexVal.AddRange(docTimeStampByteArr);
    
        //Expecting UInt64, so make sure we added 8 bytes (likely only added 4)
        hexVal.AddRange(new byte[] { 0x00, 0x00, 0x00, 0x01 }); //Not sure why the last bytes is a 0x01, but it was present in observed ResumeTokens
    
        //Unknown Object observed in a ResumeToken
        //0x46 = CType::Object, followed by the string "d_id" NULL
        //This may be something that identifies that the following value is for the "_id" field of the ObjectID given next
        hexVal.AddRange(new byte[] { 0x46, 0x64, 0x5F, 0x69, 0x64, 0x00 }); //Unknown Object, expected to be 32 bits, with a 0x00 terminator
    
        //Insert OID (from 0._id.ObjectID)
        hexVal.Add(0x64);   //OID Tag
        byte[] docByteArr = firstDoc["o"]["_id"].AsObjectId.ToByteArray();
        hexVal.AddRange(docByteArr);
        hexVal.Add(0x00);   //End of OID
    
        //Insert UUID (from ui) as BinData
        hexVal.AddRange(new byte[] { 0x5a, 0x10, 0x04 });   //0x5A = BinData, 0x10 is Length (16 bytes), 0x04 is BinDataType (newUUID)
        hexVal.AddRange(firstDoc["ui"].AsByteArray);
    
        hexVal.Add(0x04);   //Unknown marker (maybe end of resumeToken since 0x04 == ASCII 'EOT')
    
        //Package the binary data into a BsonDocument with the key "_data" and the value as a Base64 encoded string
        BsonDocument retDoc = new BsonDocument("_data", new BsonBinaryData(hexVal.ToArray()));
        return retDoc;
    }
    
    
    /// <summary>
    /// Example Code for setting up and resuming to the second doc
    /// </summary>
    internal static void MonitorChangeStream()
    {
        mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
        IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
        var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");
    
        var options = new ChangeStreamOptions();
        options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;
    
        try
        {
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");  //Works
    
            //Build ResumeToken from the first document in the oplog collection
            BsonDocument resumeTokenRefDoc = GetFirstDocumentFromOpLog(collection.CollectionNamespace.ToString());
            if (resumeTokenRefDoc != null)
            {
                BsonDocument docResumeToken = GetResumeTokenFromOpLogDoc(resumeTokenRefDoc);
                options.ResumeAfter = docResumeToken;
            }
    
            //Setup the ChangeStream/Watch Cursor
            var cursor = collection.Watch(pipeline, options);
            var enumerator = cursor.ToEnumerable().GetEnumerator();
    
            enumerator.MoveNext();  //Blocks until a record is UPDATEd, REPLACEd or INSERTed in the database (thanks to the pipeline arg), or returns the second entry (thanks to the ResumeToken that points to the first entry)
    
            ChangeStreamDocument<BsonDocument> lastChangeStreamDocument = enumerator.Current;
            //lastChangeStreamDocument is now pointing to the second entry in the oplog, or the just received entry
            //A loop can be setup to call enumerator.MoveNext() to step through each entry in the oplog history and to also receive new events
    
            enumerator.Dispose();   //Be sure to dispose of the enumerator when finished.
        }
        catch( Exception ex)
        {
            //Logger.WriteException(ex);
        }
    }
    

    如果有人对代码的改进有任何建议,请提供建议。我还在学习。

    关于c# - 如何在第一个文档上恢复MongoDB ChangeStream,而不仅仅是在我开始听后进行更改,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48665409/

    10-12 15:43