Tornado 使用
Tornado 是一个 Python web 框架和异步网络处理库。通过使用非阻塞的网络 I/O ,Tornado 可以处理数以万计的开放连接,非常适合长轮询、WebSocket 或其它需要保持长连接的应用场景。
Tornado web 应用结构
- 一个或多个 RequestHander 类,处理请求事务
- 一个 Application 对象路由请求到对应的处理类
- 一个 main 函数启动 server
主要依赖
- concurrent.futures 线程池 Python 3 包含在标准库中
- pycurl tornado.curl_httpclient 使用
- Twisted tornado.platform.twisted
- pycares 非阻塞 DNS 解析器的替代方案
- monotonic 或 Monotime 系统时钟支持 Python 3.3 不需要
开始
Tornado 大致分为 4 主要的部件:
- web framework 包含 RequestHander 用于创建 web 应用,以及其它功能支持类
- 客户端与服务器端的 HTTP 实现 - HTTPServer 和 AsyncHTTPClient
- 异步网络库 - IOLoop 和 IOStream ,HTTP 模块的实现并且可用于实现其它协议
- 协程库 tornado.gen 替代回调函数链,更直观的书写异步代码,同 ES6 点 async await
既能当作 web 框架使用,又能当作 http server 使用。
异步和非阻塞I/O
Tornado 同 Node.js 采用单线程事件循环(single-threaded event loop),异步,非阻塞。
实现异步的方法有
- Callback 参数
- 返回一个占位符 placeholder (Future,Promise,Deferred)
- 投入队列
- Callback 注册(如 POSIX signals)
# callback 实例
from tornado.httpclient import AsyncHTTPClient
def asynchronous_fetch(url, callback):
http_client = AsyncHTTPClient()
def handle_response(response):
callback(response.body)
http_client.fetch(url, callback=handle_response)
# Future 实现
from tornado.concurrent import Future
def async_fetch_future(url):
http_client = AsyncHTTPClient()
my_future = Future()
fetch_future = http_client.fetch(url)
fetch_future.add_done_callback(
lambda f: my_future.set_result(f.result()))
return my_future
Future 两点优势
- 一致的错误处理
- 使用协程(coroutine)的方式编程
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
raise gen.Return(response.body)
python 2.x generator 中不能使用 return, python 3.3 以上可以使用
协程
Tornado 推荐的异步编程方式
使用 @gen.coroutine 修饰函数,使用 yeild 关键字实现函数悬挂和恢复的切换,函数运行到 yeild 会断点,交出 CPU 使用权,等待结果返回后,函数继续在断点处执行。
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
# In Python versions prior to 3.3, returning a value from
# a generator is not allowed and you must use
# raise gen.Return(response.body)
# instead.
return response.body
python 3.5 以上,使用 async await 替代@gen.coroutine 和 yeild
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
工作原理
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
coroutine 的调用: coroutine 函数只能由 coroutine 函数调用,并调用时使用 yeild 关键字
coroutine 使用模式
调用回调函数
使用 gen.Task 包裹回调形式的函数,可将函数改造为返回 Future
@gen.coroutine
def call_task():
# Note that there are no parens on some_function.
# This will be translated by Task into
# some_function(other_args, callback=callback)
yield gen.Task(some_function, other_args)
调用阻塞的函数
使用 ThreadPoolExecutor 可使阻塞的函数调用返回 Future 以与 coroutine 兼容 thread_pool = ThreadPoolExecutor(4)
@gen.coroutine
def call_blocking():
yield thread_pool.submit(blocking_func, args)
并行调用
同 ES 的 Promise.all([])
@gen.coroutine
def parallel_fetch(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
@gen.coroutine
def parallel_fetch_many(urls):
responses = yield [http_client.fetch(url) for url in urls]
# responses is a list of HTTPResponses in the same order
@gen.coroutine
def parallel_fetch_dict(urls):
responses = yield {url: http_client.fetch(url)
for url in urls}
# responses is a dict {url: HTTPResponse}
后台运行
while True 配合 gen.sleep
@gen.coroutine
def minute_loop():
while True:
yield do_something()
yield gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
在一定时间内运行程序
@gen.coroutine
def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
yield do_something() # Run while the clock is ticking.
yield nxt # Wait for the timer to run out.
临时保存 Future
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
迭代器模式
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()