tornado.queues – Queues for coroutines

4.2版中的新功能.

协程的异步队列. 这些类与标准库的asyncio包中提供的类非常相似.

Warning

与标准库的queue模块不同,此处定义的类不是线程安全的. 要从另一个线程使用这些队列,请在调用任何队列方法之前,使用IOLoop.add_callback将控制权转移到IOLoop线程.

Classes

Queue

class tornado.queues.Queue(maxsize: int = 0)[source]

协调生产者和消费者协程.

如果maxsize为0(默认值),则队列大小不受限制.

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在没有本机协程的Python版本(3.5之前)中, consumer()可以写为:

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

在版本4.3中进行了更改:添加了async for支持Python 3.5.

property maxsize

队列中允许的项目数.

qsize() → int[source]

队列中的项目数.

put(item: _T, timeout: Union[float, datetime.timedelta] = None) → Future[None][source]

将项目放入队列中,也许要等到有空间为止.

返回一个Future,它在超时后引发tornado.util.TimeoutError .

timeout可以是表示时间的数字(与tornado.ioloop.IOLoop.time相同,通常为time.time ),或者是相对于当前时间的截止日期的datetime.timedelta对象.

put_nowait(item: _T) → None[source]

将项目放入队列而不会阻塞.

如果没有可用的空闲插槽,请提高QueueFull .

get(timeout: Union[float, datetime.timedelta] = None) → Awaitable[_T][source]

从队列中删除并返回一个项目.

返回一个等待项,该等待项在某项可用时解决,或者在超时后引发tornado.util.TimeoutError .

timeout可以是表示时间的数字(与tornado.ioloop.IOLoop.time相同,通常为time.time ),或者是相对于当前时间的截止日期的datetime.timedelta对象.

Note

此方法的timeout参数不同于标准库的queue.Queue.getqueue.Queue.get . 该方法将数值解释为相对超时. 这一节将它们解释为绝对期限,并需要timedelta对象作为相对超时(与Tornado中的其他超时一致).

get_nowait() → _T[source]

从队列中删除并返回一个项目,而不会阻塞.

如果有一个立即可用,则返回一个项目,否则引发QueueEmpty .

task_done() → None[source]

表示先前排队的任务已完成.

由队列使用者使用. 对于用于获取任务的每个get ,随后对task_done调用将告诉队列该任务的处理已完成.

如果join被阻止,则在处理完所有项目后将恢复; 也就是说,当每个put都由task_done匹配时.

Raises ValueError if called more times than put.

join(timeout: Union[float, datetime.timedelta] = None) → Awaitable[None][source]

阻塞直到队列中的所有项目都被处理.

返回一个waitable,超时后引发tornado.util.TimeoutError .

PriorityQueue

class tornado.queues.PriorityQueue(maxsize: int = 0)[source]

一个按优先级顺序检索条目的Queue ,从最低到最低.

条目通常是元组,例如(priority number, data) .

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize: int = 0)[source]

一个Queue ,它首先检索最近放置的项目.

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
1
2
3

Exceptions

QueueEmpty

exception tornado.queues.QueueEmpty[source]

当队列中没有项目时,由Queue.get_nowait .

QueueFull

exception tornado.queues.QueueFull[source]

当队列达到最大大小时,由Queue.put_nowait .