Tornado 使用

2 minute read

Tornado 是一个 Python web 框架和异步网络处理库。通过使用非阻塞的网络 I/O ,Tornado 可以处理数以万计的开放连接,非常适合长轮询、WebSocket 或其它需要保持长连接的应用场景。

Tornado web 应用结构

  • 一个或多个 RequestHander 类,处理请求事务
  • 一个 Application 对象路由请求到对应的处理类
  • 一个 main 函数启动 server

主要依赖

  • concurrent.futures 线程池 Python 3 包含在标准库中
  • pycurl tornado.curl_httpclient 使用
  • Twisted tornado.platform.twisted
  • pycares 非阻塞 DNS 解析器的替代方案
  • monotonic 或 Monotime 系统时钟支持 Python 3.3 不需要

开始

Tornado 大致分为 4 主要的部件:

  1. web framework 包含 RequestHander 用于创建 web 应用,以及其它功能支持类
  2. 客户端与服务器端的 HTTP 实现 - HTTPServer 和 AsyncHTTPClient
  3. 异步网络库 - IOLoop 和 IOStream ,HTTP 模块的实现并且可用于实现其它协议
  4. 协程库 tornado.gen 替代回调函数链,更直观的书写异步代码,同 ES6 点 async await

既能当作 web 框架使用,又能当作 http server 使用。

异步和非阻塞I/O

Tornado 同 Node.js 采用单线程事件循环(single-threaded event loop),异步,非阻塞。

实现异步的方法有

  • Callback 参数
  • 返回一个占位符 placeholder (Future,Promise,Deferred)
  • 投入队列
  • Callback 注册(如 POSIX signals)
# callback 实例
from tornado.httpclient import AsyncHTTPClient
def asynchronous_fetch(url, callback):
    http_client = AsyncHTTPClient()
    def handle_response(response):
        callback(response.body)
    http_client.fetch(url, callback=handle_response)

# Future 实现
from tornado.concurrent import Future
def async_fetch_future(url):
    http_client = AsyncHTTPClient()
    my_future = Future()
    fetch_future = http_client.fetch(url)
    fetch_future.add_done_callback(
        lambda f: my_future.set_result(f.result()))
    return my_future

Future 两点优势

  • 一致的错误处理
  • 使用协程(coroutine)的方式编程
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    raise gen.Return(response.body)

python 2.x generator 中不能使用 return, python 3.3 以上可以使用

协程

Tornado 推荐的异步编程方式

使用 @gen.coroutine 修饰函数,使用 yeild 关键字实现函数悬挂和恢复的切换,函数运行到 yeild 会断点,交出 CPU 使用权,等待结果返回后,函数继续在断点处执行。

from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    # In Python versions prior to 3.3, returning a value from
    # a generator is not allowed and you must use
    #   raise gen.Return(response.body)
    # instead.
    return response.body

python 3.5 以上,使用 async await 替代@gen.coroutine 和 yeild

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

工作原理

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

coroutine 的调用: coroutine 函数只能由 coroutine 函数调用,并调用时使用 yeild 关键字

coroutine 使用模式

调用回调函数

使用 gen.Task 包裹回调形式的函数,可将函数改造为返回 Future

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

调用阻塞的函数

使用 ThreadPoolExecutor 可使阻塞的函数调用返回 Future 以与 coroutine 兼容 thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

并行调用

同 ES 的 Promise.all([])

@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}

后台运行

while True 配合 gen.sleep

@gen.coroutine
def minute_loop():
    while True:
        yield do_something()
        yield gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

在一定时间内运行程序

@gen.coroutine
def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        yield do_something()  # Run while the clock is ticking.
        yield nxt             # Wait for the timer to run out.

临时保存 Future
@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

迭代器模式

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

Tags:

Categories:

Updated: