这是问题所在。
我有本地mongos实例,该实例连接到远程mongod
远程数据库使用基本密码身份验证。
我正在尝试使用简单的Scala应用程序为特定集合设置ChangeStream watcher。
实际的代码如下所示:

  private val mongo = new MongoClient(
    new ServerAddress("localhost", 27017),
    MongoCredential.createCredential("username", "myDB", "password".toCharArray),
    MongoClientOptions.builder().addServerListener(ServerStateListener).build()
  )
  private val collection = mongo
    .getDatabase(DB)
    .getCollection("someObjectsCollection")

  private val ch = collection
    .watch()
    .fullDocument(FullDocument.UPDATE_LOOKUP)
    .iterator()

它在.fullDocument(FullDocument.UPDATE_LOOKUP)行告诉时中断:
Exception in thread "main" com.mongodb.MongoCommandException: Command failed with error 13: 'not authorized on myDB to execute command { aggregate: "someObjectsCollection", pipeline: [ { $changeStream: { fullDocument: "updateLookup" } } ], cursor: {}, $db: "myDB", $clusterTime: { clusterTime: Timestamp(1524064297, 2), ....

这是令人困惑的,因为给定的用户凭据通过远程数据库和本地mongo shell上的mongos工作。而且,我尝试对该应用程序内部的collection执行其他一些操作(例如collection.count()),并且可以正常工作!当我尝试设置观察程序时出现问题。

最佳答案

最终我发现我的设置出了什么问题...

我用来消耗变更流的原始用户“用户名”具有严格的权限集:

"inheritedPrivileges" : [
    {
        "resource" : {
            "db" : "abuCoreDev",
            "collection" : ""
        },
        "actions" : [
            "convertToCapped",
            "createCollection",
            "createIndex",
            "dropIndex",
            "find",
            "insert",
            "listCollections",
            "listIndexes",
            "planCacheIndexFilter",
            "remove",
            "update"
        ]
    }
],

我没有意识到我需要特殊的changeStream权限才能使用更改流!当我以具有被诅咒的权限的mongos连接到root时,一切正常。

在这里,您可以看到我的root用户的权限:
{
            "resource" : {
                "db" : "",
                "collection" : ""
            },
            "actions" : [
                "bypassDocumentValidation",
                "changeCustomData",
                "changePassword",
                "changeStream",
                "collMod",
                "collStats",
                "compact",
                "convertToCapped",
                "createCollection",
                "createIndex",
                "createRole",
                "createUser",
                "dbHash",
                "dbStats",
                "dropCollection",
                "dropDatabase",
                "dropIndex",
                "dropRole",
                "dropUser",
                "emptycapped",
                "enableProfiler",
                "enableSharding",
                "find",
                "getShardVersion",
                "grantRole",
                "indexStats",
                "insert",
                "killCursors",
                "listCollections",
                "listIndexes",
                "moveChunk",
                "planCacheIndexFilter",
                "planCacheRead",
                "planCacheWrite",
                "reIndex",
                "remove",
                "renameCollectionSameDB",
                "repairDatabase",
                "revokeRole",
                "setAuthenticationRestriction",
                "splitChunk",
                "splitVector",
                "storageDetails",
                "update",
                "validate",
                "viewRole",
                "viewUser"
            ]
        }

10-05 18:29