由于我的大部分任务都依赖于网络,因此我希望并行处理我的队列,而不仅仅是一次处理一条消息。
所以,我使用以下代码:
#!/usr/bin/env ruby
# encoding: utf-8
require "rubygems"
require 'amqp'
EventMachine.run do
connection = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(connection)
channel.prefetch 5
queue = channel.queue("pending_checks", :durable => true)
exchange = channel.direct('', :durable => true)
queue.subscribe(:ack => true) do |metadata, payload|
time = rand(3..9)
puts 'waiting ' + time.to_s + ' for message ' + payload
sleep(time)
puts 'done with '+ payoad
metadata.ack
end
end
为什么它不使用我的预取设置?我想它应该得到 5 条消息并并行处理它们,不是吗?
最佳答案
预取是在您确认之前可能提前发送给您的最大消息数。
换句话说,预取大小不限制单个消息到客户端的传输,只是在客户端仍有一个或多个未确认的消息时提前发送更多消息。 (来自 AMPQ 文档)
QoS Prefetching Messages
RabbitMQ AMQP Reference
EventMachine 是单线程和基于事件的。对于不同线程或进程上的并行作业,请参阅 EM::Deferrable,然后是 Thread 或 spawn。
另请参阅 Hot Bunnies,这是一种基于 RabbitMQ Java 客户端的快速 DSL:
https://github.com/ruby-amqp/hot_bunnies
(感谢 Michael Klishin 在 Google Groups 上的信息,以及博主上的 stoyan)
关于Ruby + AMQP : processing queue in parallel,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/10768850/