问题描述
我正在使用I/O非阻塞python服务器Tornado.我有一类GET
请求,可能需要花费大量时间才能完成(请考虑5-10秒的范围).问题在于,龙卷风会阻止这些请求,因此后续的快速请求将被阻止,直到慢速请求完成为止.
我查看了: https://github.com/facebook/tornado/wiki/Threading-and -concurrency 并得出结论,我希望将#3(其他进程)和#4(其他线程)进行某种组合. #4本身有问题,当有另一个线程在执行"heavy_lifting"时,我无法可靠地控制回到ioloop. (我认为这是由于GIL造成的,而且heavy_lifting任务具有很高的CPU负载,并且一直将控制权从主要ioloop移开,但这是一个猜测).
因此,我一直在原型设计中,如何通过在单独的进程中的这些缓慢的GET
请求中执行繁重的任务"来解决此问题,然后在完成该过程以完成请求时将回调放置回Tornado ioloop中.这样可以释放ioloop来处理其他请求.
我创建了一个简单的示例,演示了可能的解决方案,但很好奇能从社区中获得反馈.
我的问题有两个:如何简化当前的方法?可能存在哪些陷阱?
方法
-
利用Tornado的内置
asynchronous
装饰器,该装饰器允许请求保持打开状态并让ioloop继续. -
使用python的
multiprocessing
模块为繁重"任务产生一个单独的进程.我首先尝试使用threading
模块,但无法将控制可靠地交还给ioloop.看来mutliprocessing
也将利用多核. -
使用
threading
模块在ioloop主进程中启动观察者"线程,该模块的工作是在完成任务后监视multiprocessing.Queue
的繁重任务"的结果.这是必需的,因为我需要一种方法来知道Heavy_lifting任务已经完成,同时仍然可以通知ioloop此请求现在已经完成. -
请确保观察者"线程经常通过
time.sleep(0)
调用将控制权释放给主ioloop循环,以便继续轻松处理其他请求. -
当队列中有结果时,请使用
tornado.ioloop.IOLoop.instance().add_callback()
从观察程序"线程中添加回调,这是从其他线程调用ioloop实例的唯一安全方法. -
请确保随后在回调中调用
finish()
以完成请求并移交答复.
下面是一些显示此方法的示例代码. multi_tornado.py
是实现以上概述的服务器,而call_multi.py
是以两种不同方式调用服务器以测试服务器的示例脚本.两种测试均以3个慢速GET
请求和随后的20个快速GET
请求来调用服务器.显示了在启用和未启用线程的情况下运行的结果.
在无线程运行"的情况下,这3个缓慢的请求块(每个请求花费一点时间才能完成). 20个快速请求中的一些在ioloop中的一些慢请求之间挤压(不完全确定如何发生-但可能是我在同一台机器上同时运行服务器和客户端测试脚本的工件).此处的要点是所有快速请求都受到不同程度的阻止.
在启用了线程的情况下运行它的情况下,所有20个快速请求都首先立即完成,而三个慢速请求则在它们分别并行运行之后大约在同一时间完成.这是所需的行为.三个慢速请求并行完成需要2.5秒-而在非线程情况下,三个慢速请求总共需要大约3.5秒.因此,整体速度提高了约35%(我认为是由于多核共享).但更重要的是-快速请求立即在慢速请求的leu中处理.
我在多线程编程方面没有太多经验-因此,尽管这似乎在这里起作用,但我很想学习:
是否有更简单的方法来完成此操作?这种方法会潜伏什么怪物?
(注意:未来的权衡可能是仅使用像nginx这样的反向代理来运行更多Tornado实例来进行负载平衡.无论我将如何使用负载平衡器来运行多个实例-但我担心只是抛出硬件之所以会遇到这个问题,是因为硬件似乎在阻塞方面与问题直接相关.)
示例代码
multi_tornado.py
(示例服务器):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(客户端测试程序):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
测试结果
通过运行python call_multi.py slow
(阻止行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
通过运行python call_multi.py slow_threaded
(所需的行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
如果您愿意使用 concurrent.futures.ProcessPoolExecutor
而不是multiprocessing
,这实际上非常简单. Tornado的ioloop已经支持concurrent.futures.Future
,因此它们可以很好地配合使用. concurrent.futures
包含在Python 3.2+中,并且已反向移植到Python 2.x . /p>
这是一个例子:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
输出:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
的API比multiprocessing.Pool
更为有限,但是如果您不需要multiprocessing.Pool
的更高级功能,则值得使用,因为集成非常简单.
I am using the I/O non-blocking python server Tornado. I have a class of GET
requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.
I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).
So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET
requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.
I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.
My question is two-fold: How can this current approach be simplified? What pitfalls potentially exist with it?
The Approach
Utilize Tornado's builtin
asynchronous
decorator which allows a request to stay open and for the ioloop to continue.Spawn a separate process for "heavy lifting" tasks using python's
multiprocessing
module. I first attempted to use thethreading
module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears thatmutliprocessing
would also take advantage of multicores.Start a 'watcher' thread in the main ioloop process using the
threading
module who's job it is to watch amultiprocessing.Queue
for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with
time.sleep(0)
calls so that other requests continue to get readily processed.When there is a result in the queue then add a callback from the "watcher" thread using
tornado.ioloop.IOLoop.instance().add_callback()
which is documented to be the only safe way to call ioloop instances from other threads.Be sure to then call
finish()
in the callback to complete the request and hand over a reply.
Below is some sample code showing this approach. multi_tornado.py
is the server implementing the above outline and call_multi.py
is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET
requests followed by 20 fast GET
requests. The results are shown for both running with and without the threading turned on.
In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.
In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.
I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:
Is there a simpler way to accomplish this? What monster's may lurk within this approach?
(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)
Sample Code
multi_tornado.py
(sample server):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(client tester):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '\nStart sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
Test Results
By running python call_multi.py slow
(the blocking behavior):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
By running python call_multi.py slow_threaded
(the desired behavior):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
Here's an example:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
Output:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
这篇关于如何在python Tornado服务器的请求内最好地执行多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!