我试图与Tornado queue一起使用新的concurrent.futures对象,以允许我的网络服务器将CPU密集型任务传递给其他进程。我想访问从Future模块从ProcessPoolExecutor返回的concurrent.futures对象,以便我可以查询其状态以显示在前端(例如,显示当前正在运行的进程;显示它已完成)。

使用此方法,我似乎有两个障碍:


如何同时向返回的q.get()对象访问多个ProcessPoolExecutor对象?
如何让Future访问HomeHandler返回的Future对象,以便可以在前端显示状态信息?


谢谢你的帮助。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

from concurrent.futures import ProcessPoolExecutor

define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)


def expensive_function(input_dict):
    gen.sleep(1)


@gen.coroutine
def consumer():
    while True:
        input_dict = yield q.get()
        try:
            with ProcessPoolExecutor(max_workers=4) as executor:
                future = executor.submit(expensive_function, input_dict)
        finally:
            q.task_done()


@gen.coroutine
def producer(input_dict):
    yield q.put(input_dict)


class Application(tornado.web.Application):
def __init__(self):
    handlers = [
        (r"/", HomeHandler),
    ]
    settings = dict(
        blog_title=u"test",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        debug=True,
    )
    super(Application, self).__init__(handlers, **settings)


class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.html")

    def post(self, *args, **kwargs):
        input_dict = {'foo': 'bar'}

        producer(input_dict)

        self.redirect("/")


def main():
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


def start_consumer():
    tornado.ioloop.IOLoop.current().spawn_callback(consumer)


if __name__ == "__main__":
    tornado.ioloop.IOLoop.current().run_sync(start_consumer)
    main()

最佳答案

您想通过组合QueueProcessPoolExecutor完成什么?执行者已经拥有了自己的内部队列。您需要做的就是将ProcessPoolExecutor设置为全局变量(不必是全局变量,但是即使保留队列,您也想做一些类似于全局变量的事情;每次通过ProcessPoolExecutor的循环创建一个新的consumer),然后直接从处理程序中提交内容。

@gen.coroutine
def post(self):
    input_dict = ...
    result = yield executor.submit(expensive_function, input_dict)

关于python - Tornado 中的Queue和ProcessPoolExecutor,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33553940/

10-12 22:24