tornado.locks – Synchronization primitives

4.2版中的新功能.

使用与标准库提供给线程的同步原语相似的同步原语来协调协程. 这些类与标准库的asyncio包中提供的类非常相似.

Warning

请注意,这些原语实际上不是线程安全的,因此不能代替标准库的threading模块中的原语-它们旨在协调单线程应用程序中的Tornado协程,而不是保护多线程应用程序中的共享对象.

Condition

class tornado.locks.Condition[source]

条件允许一个或多个协程等待通知.

类似于标准threading.Condition ,但不需要获取和释放基础锁.

使用Condition ,协程可以等待其他协程通知:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition

condition = Condition()

async def waiter():
    print("I'll wait right here")
    await condition.wait()
    print("I'm done waiting")

async def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

async def runner():
    # Wait for waiter() and notifier() in parallel
    await gen.multi([waiter(), notifier()])

IOLoop.current().run_sync(runner)
I'll wait right here
About to notify
Done notifying
I'm done waiting

wait需要一个可选的timeout参数,它可以是绝对时间戳:

io_loop = IOLoop.current()

# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)

…或datetime.timedelta表示相对于当前时间的超时:

# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))

如果在截止日期之前没有通知,则该方法返回False.

在版本5.0中更改:以前,可以从notify内同步notify服务员. 现在,将始终在IOLoop的下一次迭代中接收到通知.

wait(timeout: Union[float, datetime.timedelta] = None) → Awaitable[bool][source]

等待notify .

返回一个Future ,如果通知了该条件,则该解析为True如果超时,则返回False .

notify(n: int = 1) → None[source]

Wake n waiters.

notify_all() → None[source]

唤醒所有服务员.

Event

class tornado.locks.Event[source]

事件会阻止协程,直到其内部标志设置为True.

类似于threading.Event .

协程可以等待事件设置. 设置后,除非事件已被清除,否则对yield event.wait()调用将不会阻塞:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event

event = Event()

async def waiter():
    print("Waiting for event")
    await event.wait()
    print("Not waiting this time")
    await event.wait()
    print("Done")

async def setter():
    print("About to set the event")
    event.set()

async def runner():
    await gen.multi([waiter(), setter()])

IOLoop.current().run_sync(runner)
Waiting for event
About to set the event
Not waiting this time
Done
is_set() → bool[source]

如果内部标志为true,则返回True .

set() → None[source]

将内部标志设置为True . 所有的服务员都被唤醒.

一旦设置了标志,调用wait将不会阻塞.

clear() → None[source]

将内部标志重置为False .

wait呼叫将阻塞,直到调用set .

wait(timeout: Union[float, datetime.timedelta] = None) → Awaitable[None][source]

阻塞直到内部标志为真.

Returns an awaitable, which raises tornado.util.TimeoutError after a timeout.

Semaphore

class tornado.locks.Semaphore(value: int = 1)[source]

锁定之前可以获取固定次数的锁.

信号量管理一个计数器,该计数器代表release调用的数量减去acquire调用的数量再加上一个初始值. 必要时, acquire方法将阻塞,直到可以返回而不使计数器为负数为止.

信号量限制对共享资源的访问. 要一次允许两个工作人员访问:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore

sem = Semaphore(2)

async def worker(worker_id):
    await sem.acquire()
    try:
        print("Worker %d is working" % worker_id)
        await use_some_resource()
    finally:
        print("Worker %d is done" % worker_id)
        sem.release()

async def runner():
    # Join all workers.
    await gen.multi([worker(i) for i in range(3)])

IOLoop.current().run_sync(runner)
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done

允许工作程序0和1并发运行,但是工作程序2等待直到由工作程序0释放信号量一次.

信号量可以用作异步上下文管理器:

async def worker(worker_id):
    async with sem:
        print("Worker %d is working" % worker_id)
        await use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

为了与旧版本的Python兼容, acquire是上下文管理器,因此worker也可以写成:

@gen.coroutine
def worker(worker_id):
    with (yield sem.acquire()):
        print("Worker %d is working" % worker_id)
        yield use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

在版本4.3中更改:在Python 3.5中添加了对async with支持.

release() → None[source]

增加计数器并唤醒一名服务员.

acquire(timeout: Union[float, datetime.timedelta] = None) → Awaitable[tornado.locks._ReleasingContextManager][source]

减少计数器. 返回一个等待的.

如果计数器为零,则阻塞并等待release . 截止日期之后,awaitable引发TimeoutError .

BoundedSemaphore

class tornado.locks.BoundedSemaphore(value: int = 1)[source]

防止release()被多次调用的信号量.

如果release将信号量的值ValueError超过初始值,则会引发ValueError . 信号量通常用于保护容量有限的资源,因此,信号量释放过多会表明存在错误.

release() → None[source]

增加计数器并唤醒一名服务员.

acquire(timeout: Union[float, datetime.timedelta] = None) → Awaitable[tornado.locks._ReleasingContextManager]

减少计数器. 返回一个等待的.

如果计数器为零,则阻塞并等待release . 截止日期之后,awaitable引发TimeoutError .

Lock

class tornado.locks.Lock[source]

协程锁.

锁开始解锁,并立即acquire锁定. 锁定时,产生acquire的协程等待直到另一个协程调用release .

Releasing an unlocked lock raises RuntimeError.

可以使用async with语句将Lock用作异步上下文管理器:

>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
...    async with lock:
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

为了与旧版本的Python兼容, acquire方法异步返回一个常规的上下文管理器:

>>> async def f2():
...    with (yield lock.acquire()):
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

在版本4.3中更改:在Python 3.5中添加了对async with支持.

acquire(timeout: Union[float, datetime.timedelta] = None) → Awaitable[tornado.locks._ReleasingContextManager][source]

尝试锁定. 返回一个等待的.

返回一个等待的对象,它在超时后引发tornado.util.TimeoutError .

release() → None[source]

Unlock.

排队等待acquire的第一个协程得到锁.

如果未锁定,则引发RuntimeError .