我试图与Tornado queue一起使用新的concurrent.futures
对象,以允许我的网络服务器将CPU密集型任务传递给其他进程。我想访问从Future
模块从ProcessPoolExecutor
返回的concurrent.futures
对象,以便我可以查询其状态以显示在前端(例如,显示当前正在运行的进程;显示它已完成)。
使用此方法,我似乎有两个障碍:
如何同时向返回的q.get()
对象访问多个ProcessPoolExecutor
对象?
如何让Future
访问HomeHandler
返回的Future
对象,以便可以在前端显示状态信息?
谢谢你的帮助。
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
from concurrent.futures import ProcessPoolExecutor
define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)
def expensive_function(input_dict):
gen.sleep(1)
@gen.coroutine
def consumer():
while True:
input_dict = yield q.get()
try:
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(expensive_function, input_dict)
finally:
q.task_done()
@gen.coroutine
def producer(input_dict):
yield q.put(input_dict)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", HomeHandler),
]
settings = dict(
blog_title=u"test",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
debug=True,
)
super(Application, self).__init__(handlers, **settings)
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.html")
def post(self, *args, **kwargs):
input_dict = {'foo': 'bar'}
producer(input_dict)
self.redirect("/")
def main():
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def start_consumer():
tornado.ioloop.IOLoop.current().spawn_callback(consumer)
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(start_consumer)
main()
最佳答案
您想通过组合Queue
和ProcessPoolExecutor
完成什么?执行者已经拥有了自己的内部队列。您需要做的就是将ProcessPoolExecutor
设置为全局变量(不必是全局变量,但是即使保留队列,您也想做一些类似于全局变量的事情;每次通过ProcessPoolExecutor
的循环创建一个新的consumer
),然后直接从处理程序中提交内容。
@gen.coroutine
def post(self):
input_dict = ...
result = yield executor.submit(expensive_function, input_dict)
关于python - Tornado 中的Queue和ProcessPoolExecutor,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33553940/