Coroutines

协程是在Tornado中编写异步代码的推荐方法. 协程使用Python awaityield关键字暂停和恢复执行,而不是一连串的回调(在gevent之类的框架中看到的协作轻量级线程有时也称为协程,但在Tornado中,所有协程都使用显式上下文切换,并称为异步功能).

协程几乎和同步代码一样简单,但是没有线程的开销. 通过减少上下文切换可能发生的位置,它们还使并发更容易推理.

Example:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

Native vs decorated coroutines

Python 3.5引入了asyncawait关键字(使用这些关键字的函数也称为"本地协程"). 为了与旧版本的Python兼容,您可以使用tornado.gen.coroutine装饰器使用"装饰的"或"基于收益的"协程.

建议尽可能使用天然协程. 仅在需要与旧版本的Python兼容时才使用修饰的协程. Tornado文档中的示例通常将使用本机格式.

两种形式之间的翻译通常很简单:

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

两种形式的协程之间的其他区别概述如下.

  • 原生协程:

    • 通常更快.

    • 可以async for语句使用async forasync with语句async with这使某些模式更加简单.

    • 除非您awaityield它们,否则根本不要运行. 装饰的协程一被调用就可以在"后台"开始运行. 请注意,对于两种协程,使用awaityield都很重要,这样任何异常都可以解决.

  • 装饰的协程:

    • concurrent.futures程序包具有其他集成,从而可以直接产生executor.submit的结果. 对于本机协程,请改用IOLoop.run_in_executor .

    • 通过产生列表或字典来支持在多个对象上等待的简写形式. 使用tornado.gen.multi在本地协程中执行此操作.

    • 可以通过转换函数注册表支持与其他软件包(包括Twisted)的集成. 要在本地协程中访问此功能,请使用tornado.gen.convert_yielded .

    • 总是返回一个Future对象. 原生协程返回一个不是Future等待对象. 在《龙卷风》中,这两个部分几乎可以互换.

How it works

本节说明装饰的协程的操作. 本地协程在概念上相似,但是由于与Python运行时的额外集成而稍微复杂一些.

A function containing yield is a generator. All generators are asynchronous; when called they return a generator object instead of running to completion. The @gen.coroutine decorator communicates with the generator via the yield expressions, and with the coroutine’s caller by returning a Future.

这是协程装饰器内部循环的简化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

装饰器从生成器接收Future ,等待(不阻塞)该Future完成,然后"解包" Future并将结果作为yield表达式的结果发送回生成器. 大多数异步代码从不直接接触Future类,除非立即将异步函数返回的Future传递给yield表达式.

How to call a coroutine

协程不会以通常的方式引发异常:它们引发的任何异常都将被捕获在等待的对象中,直到产生为止. 这意味着以正确的方式调用协程很重要,否则您可能会注意到一些未引起注意的错误:

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

In nearly all cases, any function that calls a coroutine must be a coroutine itself, and use the await or yield keyword in the call. When you are overriding a method defined in a superclass, consult the documentation to see if coroutines are allowed (the documentation should say that the method “may be a coroutine” or “may return a Future”):

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有时,您可能想"射击并忘记"协程而不等待其结果. 在这种情况下,建议使用IOLoop.spawn_callback ,这使IOLoop负责该调用. 如果失败,则IOLoop将记录堆栈跟踪:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

使用IOLoop.spawn_callback以这种方式, 建议使用功能@gen.coroutine ,但是需要使用的功能async def (否则协程亚军将无法启动).

最后,在程序的顶层, 如果IOLoop尚未运行,就可以启动IOLoop ,运行协同程序,然后停止IOLoopIOLoop.run_sync方法. 这通常用于启动面向批处理程序的main功能:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

Coroutine patterns

Calling blocking functions

从协程调用阻塞函数的最简单方法是使用IOLoop.run_in_executor ,它返回与协程兼容的Futures

async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

Parallelism

multi接受值为Futures列表和字典,并并行等待所有这些Futures

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在修饰的协程中,可以直接yield列表或命令:

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

Interleaving

有时保存一个Future而不是立即放弃它很有用,因此您可以在等待之前启动另一个操作.

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        yield self.flush()

使用修饰的协程比较容易一些,因为它们在被调用时会立即启动:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

Looping

在本地协程中,可以使用async for . 在较旧版本的Python中,协程式循环非常棘手,因为无法在forwhile循环的每次迭代中yield并捕获屈服的结果. 相反,您需要将循环条件与访问结果分开,如本例中Motor所示

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

Running in the background

协程通常不使用PeriodicCallback . 相反,协程可以包含while True:循环并使用tornado.gen.sleep

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能需要更复杂的循环. 例如,上一个循环每60+N秒运行一次,其中Ndo_something()的运行时间. 要精确地每60秒运行一次,请使用上面的交错模式:

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.