如何处理并发 ruby 线程池(http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html)中的异常?
例子:
pool = Concurrent::FixedThreadPool.new(5)
pool.post do
raise 'something goes wrong'
end
# how to rescue this exception here
更新:
这是我的代码的简化版本:
def process
pool = Concurrent::FixedThreadPool.new(5)
products.each do |product|
new_product = generate_new_product
pool.post do
store_in_db(new_product) # here exception is raised, e.g. connection to db failed
end
end
pool.shutdown
pool.wait_for_terminaton
end
因此,我想达到的目标是在发生任何异常时停止处理(中断循环)。
在更高级别的应用程序中也可以挽救该异常,并且执行了一些清理作业(例如,将模型状态设置为失败并发送一些通知)。
最佳答案
以下答案来自jdantonio从这里https://github.com/ruby-concurrency/concurrent-ruby/issues/616
”
大多数应用程序不应直接使用线程池。线程池是供内部使用的低层抽象。该库中的所有高级抽象(Promise,Actor等)都将作业发布到全局线程池中,并且都提供异常处理。只需选择最适合您的用例的抽象并使用它即可。
如果您需要配置自己的线程池而不是使用全局线程池,则仍可以使用高级抽象。它们都支持:executor选项,该选项允许您注入(inject)自定义线程池。然后,您可以使用高级抽象提供的异常处理。
如果您绝对坚持直接将作业发布到线程池中,而不是使用我们的高级抽象(我强烈建议不要这样做),则只需创建一个作业包装器即可。您可以在我们所有的高级抽象,Rails ActiveJob,Sucker Punch和其他使用我们的线程池的库中找到作业包装器的示例。”
那么,用Promises实现如何呢?
http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html
在您的情况下,它看起来像这样:
promises = []
products.each do |product|
new_product = generate_new_prodcut
promises << Concurrent::Promise.execute do
store_in_db(new_product)
end
end
# .value will wait for the Thread to finish.
# The ! means, that all exceptions will be propagated to the main thread
# .zip will make one Promise which contains all other promises.
Concurrent::Promise.zip(*promises).value!
关于ruby - 处理并发 ruby 线程池中的异常,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40718759/