我一直在使用Tornado web server玩一些,到了要停止Web服务器的地步(例如,在单元测试期间)。以下简单示例exists on the Tornado web page:
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
调用
tornado.ioloop.IOLoop.instance().start()
后,它将阻止程序(或当前线程)。在IOLoop
函数的文档中,读取stop
对象的source code给出了以下示例:To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.
但是,我不知道如何将其集成到我的程序中。实际上,我有一个封装Web服务器的类(具有自己的
start
和stop
函数),但是一旦我调用start,该程序(或测试)当然就会阻塞。我试图在另一个过程中启动Web服务器(使用
multiprocessing
包)。这是包装Web服务器的类:class Server:
def __init__(self, port=8888):
self.application = tornado.web.Application([ (r"/", Handler) ])
def server_thread(application, port):
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(port)
tornado.ioloop.IOLoop.instance().start()
self.process = Process(target=server_thread,
args=(self.application, port,))
def start(self):
self.process.start()
def stop(self):
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
但是,即使使用以下测试设置,停止似乎也无法完全停止Web服务器,因为它仍在下一个测试中运行:
def setup_method(self, _function):
self.server = Server()
self.server.start()
time.sleep(0.5) # Wait for web server to start
def teardown_method(self, _function):
self.kstore.stop()
time.sleep(0.5)
如何从Python程序中启动和停止Tornado Web服务器?
最佳答案
我只是碰到了这个问题,自己发现了这个问题,并使用了来自该线程的信息。我只是简单地使用了独立的Tornado代码(从所有示例中复制)并将实际的起始代码移到了函数中。然后,我将该函数称为线程线程。我的情况与线程调用不同,它与我刚刚导入startTornado和stopTornado例程的现有代码完成了。
上面的建议似乎效果很好,所以我认为我将提供缺少的示例代码。我在FC16系统上的Linux下测试了此代码(并修复了我的初始type-o)。
import tornado.ioloop, tornado.web
class Handler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([ (r"/", Handler) ])
def startTornado():
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
def stopTornado():
tornado.ioloop.IOLoop.instance().stop()
if __name__ == "__main__":
import time, threading
threading.Thread(target=startTornado).start()
print "Your web server will self destruct in 2 minutes"
time.sleep(120)
stopTornado()
希望这对下一个人有帮助。