问题描述
我正在尝试找出使用代理使用消息队列 (Amazon SQS) 中的项目的最佳方法.现在我有一个函数 (process-queue-item) 从队列中抓取一个项目并处理它.
I'm trying to figure out the best way to use agents to consume items from a Message Queue (Amazon SQS). Right now I have a function (process-queue-item) that grabs an items from the queue, and processes it.
我想同时处理这些项目,但我不知道如何控制代理.基本上,我想让所有的代理尽可能地忙碌,而不是从队列中提取许多项目并开发积压(我将在几台机器上运行它,所以项目需要留在队列中,直到它们真的需要).
I want to process these items concurrently, but I can't wrap my head around how to control the agents. Basically I want to keep all of the agents busy as much as possible without pulling to many items from the Queue and developing a backlog (I'll have this running on a couple of machines, so items need to be left in the queue until they are really needed).
谁能给我一些关于改进我的实施的建议?
Can anyone give me some pointers on improving my implementation?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
推荐答案
(let [switch (atom true) ; a switch to stop workers
workers (doall
(repeatedly 20 ; 20 workers pulling and processing items from SQS
#(future (while @switch
(retrieve item from Amazon SQS and process)))))]
(Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
(reset! switch false) ; stop !
(doseq [worker workers] @worker)) ; waiting for all workers to be done
这篇关于从队列中消费的 Clojure 代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!