我最近将mongodb从2.4更新到2.6,aggregate()中的新内存限制导致聚合失败,并出现以下错误:

Moped::Errors::OperationFailure: The operation: #<Moped::Protocol::Command
  @length=251
  @request_id=6
  @response_to=0
  @op_code=2004
  @flags=[:slave_ok]
  @full_collection_name="items.$cmd"
  @skip=0
  @limit=-1
  @selector={:aggregate=>"items", :pipeline=>[{"$group"=>{"_id"=>"$serial_number", "total"=>{"$sum"=>1}}}, {"$match"=>{"total"=>{"$gte"=>2}}}, {"$sort"=>{"total"=>-1}}, {"$limit"=>750000}]}
  @fields=nil>
failed with error 16945: "exception: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in."

所以,我试图在查询中传递allowDiskUse:true:
dupes = Item.collection.aggregate([{
                                     '$group' => {'_id' => "$serial_number", 'total' =>  { "$sum" =>  1 } } },
                                                 { '$match' =>  { 'total' =>  { '$gte' =>  2 } } },
                                                 { '$sort' =>  {'total' =>  -1}},
                                                 { '$limit' => 750000 }],
                                     { 'allowDiskUse' => true })

但这不起作用无论我怎样尝试,我都会得到这个错误:
Moped::Errors::OperationFailure: The operation: #<Moped::Protocol::Command
  @length=274
  @request_id=2
  @response_to=0
  @op_code=2004
  @flags=[:slave_ok]
  @full_collection_name="items.$cmd"
  @skip=0
  @limit=-1
  @selector={:aggregate=>"items", :pipeline=>[{"$group"=>{"_id"=>"$serial_number", "total"=>{"$sum"=>1}}}, {"$match"=>{"total"=>{"$gte"=>2}}}, {"$sort"=>{"total"=>-1}}, {"$limit"=>750000}, {"allowDiskUse"=>true}]}
  @fields=nil>
failed with error 16436: "exception: Unrecognized pipeline stage name: 'allowDiskUse'"

有人知道我如何适当地构造这个查询,以便将allowDiskUse传递到管道arg之外吗?

最佳答案

问题是Moped当前不允许Moped::Collection#aggregate的选项,只允许args的管道,
如图所示:https://github.com/mongoid/moped/blob/master/lib/moped/collection.rb#L146-
Mongo Ruby驱动程序支持Mongo::Collection#aggregate的选项,但是Mongoid 3使用Moped作为驱动程序。
不过,由于Ruby的动态特性,您可以解决这个问题。
以下测试包括Moped::Collection#aggregate的monkey补丁,前提是您提供了管道
作为第一个参数的数组,允许您附加像allowDiskUse这样的选项。
希望这有帮助。
测试/单元/项目-test.rb

require 'test_helper'

module Moped
  class Collection
    def aggregate(pipeline, opts = {})
      database.session.command({aggregate: name, pipeline: pipeline}.merge(opts))["result"]
    end
  end
end

class ItemTest < ActiveSupport::TestCase
  def setup
    Item.delete_all
  end

  test "moped aggregate with allowDiskUse" do
    puts "\nMongoid::VERSION:#{Mongoid::VERSION}\nMoped::VERSION:#{Moped::VERSION}"
    docs = [
        {serial_number: 1},
        {serial_number: 2},
        {serial_number: 2},
        {serial_number: 3},
        {serial_number: 3},
        {serial_number: 3}
    ]
    Item.create(docs)
    assert_equal(docs.count, Item.count)
    dups = Item.collection.aggregate(
        [{'$group' => {'_id' => "$serial_number", 'total' => {"$sum" => 1}}},
         {'$match' => {'total' => {'$gte' => 2}}},
         {'$sort' => {'total' => -1}},
         {'$limit' => 750000}],
        {'allowDiskUse' => true})
    p dups
  end
end

$rake测试
Run options:

# Running tests:

[1/1] ItemTest#test_moped_aggregate_with_allowDiskUse
Mongoid::VERSION:3.1.6
Moped::VERSION:1.5.2
[{"_id"=>3, "total"=>3}, {"_id"=>2, "total"=>2}]
Finished tests in 0.027865s, 35.8873 tests/s, 35.8873 assertions/s.
1 tests, 1 assertions, 0 failures, 0 errors, 0 skips

08-26 08:46