openmedialibrary_platform/Shared/lib/python3.7/site-packages/tornado/locks.py

527 lines
16 KiB
Python
Raw Permalink Normal View History

2016-02-23 06:06:55 +00:00
# Copyright 2015 The Tornado Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
2019-01-13 08:01:53 +00:00
from __future__ import absolute_import, division, print_function
2016-02-23 06:06:55 +00:00
import collections
2019-01-13 08:01:53 +00:00
from concurrent.futures import CancelledError
2016-02-23 06:06:55 +00:00
from tornado import gen, ioloop
2019-01-13 08:01:53 +00:00
from tornado.concurrent import Future, future_set_result_unless_cancelled
__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
2016-02-23 06:06:55 +00:00
class _TimeoutGarbageCollector(object):
"""Base class for objects that periodically clean up timed-out waiters.
Avoids memory leak in a common pattern like:
while True:
yield condition.wait(short_timeout)
print('looping....')
"""
def __init__(self):
self._waiters = collections.deque() # Futures.
self._timeouts = 0
def _garbage_collect(self):
# Occasionally clear timed-out waiters.
self._timeouts += 1
if self._timeouts > 100:
self._timeouts = 0
self._waiters = collections.deque(
w for w in self._waiters if not w.done())
class Condition(_TimeoutGarbageCollector):
"""A condition allows one or more coroutines to wait until notified.
Like a standard `threading.Condition`, but does not need an underlying lock
that is acquired and released.
With a `Condition`, coroutines can wait to be notified by other coroutines:
.. testcode::
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition
condition = Condition()
2019-01-13 08:01:53 +00:00
async def waiter():
2016-02-23 06:06:55 +00:00
print("I'll wait right here")
2019-01-13 08:01:53 +00:00
await condition.wait()
2016-02-23 06:06:55 +00:00
print("I'm done waiting")
2019-01-13 08:01:53 +00:00
async def notifier():
2016-02-23 06:06:55 +00:00
print("About to notify")
condition.notify()
print("Done notifying")
2019-01-13 08:01:53 +00:00
async def runner():
# Wait for waiter() and notifier() in parallel
await gen.multi([waiter(), notifier()])
2016-02-23 06:06:55 +00:00
IOLoop.current().run_sync(runner)
.. testoutput::
I'll wait right here
About to notify
Done notifying
I'm done waiting
`wait` takes an optional ``timeout`` argument, which is either an absolute
timestamp::
io_loop = IOLoop.current()
# Wait up to 1 second for a notification.
2019-01-13 08:01:53 +00:00
await condition.wait(timeout=io_loop.time() + 1)
2016-02-23 06:06:55 +00:00
...or a `datetime.timedelta` for a timeout relative to the current time::
# Wait up to 1 second.
2019-01-13 08:01:53 +00:00
await condition.wait(timeout=datetime.timedelta(seconds=1))
The method returns False if there's no notification before the deadline.
2016-02-23 06:06:55 +00:00
2019-01-13 08:01:53 +00:00
.. versionchanged:: 5.0
Previously, waiters could be notified synchronously from within
`notify`. Now, the notification will always be received on the
next iteration of the `.IOLoop`.
2016-02-23 06:06:55 +00:00
"""
def __init__(self):
super(Condition, self).__init__()
self.io_loop = ioloop.IOLoop.current()
def __repr__(self):
result = '<%s' % (self.__class__.__name__, )
if self._waiters:
result += ' waiters[%s]' % len(self._waiters)
return result + '>'
def wait(self, timeout=None):
"""Wait for `.notify`.
Returns a `.Future` that resolves ``True`` if the condition is notified,
or ``False`` after a timeout.
"""
waiter = Future()
self._waiters.append(waiter)
if timeout:
def on_timeout():
2019-01-13 08:01:53 +00:00
if not waiter.done():
future_set_result_unless_cancelled(waiter, False)
2016-02-23 06:06:55 +00:00
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
return waiter
def notify(self, n=1):
"""Wake ``n`` waiters."""
waiters = [] # Waiters we plan to run right now.
while n and self._waiters:
waiter = self._waiters.popleft()
if not waiter.done(): # Might have timed out.
n -= 1
waiters.append(waiter)
for waiter in waiters:
2019-01-13 08:01:53 +00:00
future_set_result_unless_cancelled(waiter, True)
2016-02-23 06:06:55 +00:00
def notify_all(self):
"""Wake all waiters."""
self.notify(len(self._waiters))
class Event(object):
"""An event blocks coroutines until its internal flag is set to True.
Similar to `threading.Event`.
A coroutine can wait for an event to be set. Once it is set, calls to
``yield event.wait()`` will not block unless the event has been cleared:
.. testcode::
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event
event = Event()
2019-01-13 08:01:53 +00:00
async def waiter():
2016-02-23 06:06:55 +00:00
print("Waiting for event")
2019-01-13 08:01:53 +00:00
await event.wait()
2016-02-23 06:06:55 +00:00
print("Not waiting this time")
2019-01-13 08:01:53 +00:00
await event.wait()
2016-02-23 06:06:55 +00:00
print("Done")
2019-01-13 08:01:53 +00:00
async def setter():
2016-02-23 06:06:55 +00:00
print("About to set the event")
event.set()
2019-01-13 08:01:53 +00:00
async def runner():
await gen.multi([waiter(), setter()])
2016-02-23 06:06:55 +00:00
IOLoop.current().run_sync(runner)
.. testoutput::
Waiting for event
About to set the event
Not waiting this time
Done
"""
def __init__(self):
2019-01-13 08:01:53 +00:00
self._value = False
self._waiters = set()
2016-02-23 06:06:55 +00:00
def __repr__(self):
return '<%s %s>' % (
self.__class__.__name__, 'set' if self.is_set() else 'clear')
def is_set(self):
"""Return ``True`` if the internal flag is true."""
2019-01-13 08:01:53 +00:00
return self._value
2016-02-23 06:06:55 +00:00
def set(self):
"""Set the internal flag to ``True``. All waiters are awakened.
Calling `.wait` once the flag is set will not block.
"""
2019-01-13 08:01:53 +00:00
if not self._value:
self._value = True
for fut in self._waiters:
if not fut.done():
fut.set_result(None)
2016-02-23 06:06:55 +00:00
def clear(self):
"""Reset the internal flag to ``False``.
Calls to `.wait` will block until `.set` is called.
"""
2019-01-13 08:01:53 +00:00
self._value = False
2016-02-23 06:06:55 +00:00
def wait(self, timeout=None):
"""Block until the internal flag is true.
2019-01-13 08:01:53 +00:00
Returns a Future, which raises `tornado.util.TimeoutError` after a
2016-02-23 06:06:55 +00:00
timeout.
"""
2019-01-13 08:01:53 +00:00
fut = Future()
if self._value:
fut.set_result(None)
return fut
self._waiters.add(fut)
fut.add_done_callback(lambda fut: self._waiters.remove(fut))
2016-02-23 06:06:55 +00:00
if timeout is None:
2019-01-13 08:01:53 +00:00
return fut
2016-02-23 06:06:55 +00:00
else:
2019-01-13 08:01:53 +00:00
timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,))
# This is a slightly clumsy workaround for the fact that
# gen.with_timeout doesn't cancel its futures. Cancelling
# fut will remove it from the waiters list.
timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None)
return timeout_fut
2016-02-23 06:06:55 +00:00
class _ReleasingContextManager(object):
"""Releases a Lock or Semaphore at the end of a "with" statement.
with (yield semaphore.acquire()):
pass
# Now semaphore.release() has been called.
"""
def __init__(self, obj):
self._obj = obj
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self._obj.release()
class Semaphore(_TimeoutGarbageCollector):
"""A lock that can be acquired a fixed number of times before blocking.
A Semaphore manages a counter representing the number of `.release` calls
minus the number of `.acquire` calls, plus an initial value. The `.acquire`
method blocks if necessary until it can return without making the counter
negative.
Semaphores limit access to a shared resource. To allow access for two
workers at a time:
.. testsetup:: semaphore
from collections import deque
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import Future
# Ensure reliable doctest output: resolve Futures one at a time.
futures_q = deque([Future() for _ in range(3)])
2019-01-13 08:01:53 +00:00
async def simulator(futures):
2016-02-23 06:06:55 +00:00
for f in futures:
2019-01-13 08:01:53 +00:00
# simulate the asynchronous passage of time
await gen.sleep(0)
await gen.sleep(0)
2016-02-23 06:06:55 +00:00
f.set_result(None)
IOLoop.current().add_callback(simulator, list(futures_q))
def use_some_resource():
return futures_q.popleft()
.. testcode:: semaphore
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore
sem = Semaphore(2)
2019-01-13 08:01:53 +00:00
async def worker(worker_id):
await sem.acquire()
2016-02-23 06:06:55 +00:00
try:
print("Worker %d is working" % worker_id)
2019-01-13 08:01:53 +00:00
await use_some_resource()
2016-02-23 06:06:55 +00:00
finally:
print("Worker %d is done" % worker_id)
sem.release()
2019-01-13 08:01:53 +00:00
async def runner():
2016-02-23 06:06:55 +00:00
# Join all workers.
2019-01-13 08:01:53 +00:00
await gen.multi([worker(i) for i in range(3)])
2016-02-23 06:06:55 +00:00
IOLoop.current().run_sync(runner)
.. testoutput:: semaphore
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done
Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
the semaphore has been released once, by worker 0.
2019-01-13 08:01:53 +00:00
The semaphore can be used as an async context manager::
2016-02-23 06:06:55 +00:00
2019-01-13 08:01:53 +00:00
async def worker(worker_id):
async with sem:
2016-02-23 06:06:55 +00:00
print("Worker %d is working" % worker_id)
2019-01-13 08:01:53 +00:00
await use_some_resource()
2016-02-23 06:06:55 +00:00
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
2019-01-13 08:01:53 +00:00
For compatibility with older versions of Python, `.acquire` is a
context manager, so ``worker`` could also be written as::
2016-02-23 06:06:55 +00:00
2019-01-13 08:01:53 +00:00
@gen.coroutine
def worker(worker_id):
with (yield sem.acquire()):
2016-02-23 06:06:55 +00:00
print("Worker %d is working" % worker_id)
2019-01-13 08:01:53 +00:00
yield use_some_resource()
2016-02-23 06:06:55 +00:00
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
.. versionchanged:: 4.3
Added ``async with`` support in Python 3.5.
2019-01-13 08:01:53 +00:00
2016-02-23 06:06:55 +00:00
"""
def __init__(self, value=1):
super(Semaphore, self).__init__()
if value < 0:
raise ValueError('semaphore initial value must be >= 0')
self._value = value
def __repr__(self):
res = super(Semaphore, self).__repr__()
extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
self._value)
if self._waiters:
extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
return '<{0} [{1}]>'.format(res[1:-1], extra)
def release(self):
"""Increment the counter and wake one waiter."""
self._value += 1
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.done():
self._value -= 1
# If the waiter is a coroutine paused at
#
# with (yield semaphore.acquire()):
#
# then the context manager's __exit__ calls release() at the end
# of the "with" block.
waiter.set_result(_ReleasingContextManager(self))
break
def acquire(self, timeout=None):
"""Decrement the counter. Returns a Future.
Block if the counter is zero and wait for a `.release`. The Future
raises `.TimeoutError` after the deadline.
"""
waiter = Future()
if self._value > 0:
self._value -= 1
waiter.set_result(_ReleasingContextManager(self))
else:
self._waiters.append(waiter)
if timeout:
def on_timeout():
2019-01-13 08:01:53 +00:00
if not waiter.done():
waiter.set_exception(gen.TimeoutError())
2016-02-23 06:06:55 +00:00
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiter.add_done_callback(
lambda _: io_loop.remove_timeout(timeout_handle))
return waiter
def __enter__(self):
raise RuntimeError(
"Use Semaphore like 'with (yield semaphore.acquire())', not like"
" 'with semaphore'")
__exit__ = __enter__
@gen.coroutine
def __aenter__(self):
yield self.acquire()
@gen.coroutine
def __aexit__(self, typ, value, tb):
self.release()
class BoundedSemaphore(Semaphore):
"""A semaphore that prevents release() being called too many times.
If `.release` would increment the semaphore's value past the initial
value, it raises `ValueError`. Semaphores are mostly used to guard
resources with limited capacity, so a semaphore released too many times
is a sign of a bug.
"""
def __init__(self, value=1):
super(BoundedSemaphore, self).__init__(value=value)
self._initial_value = value
def release(self):
"""Increment the counter and wake one waiter."""
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
super(BoundedSemaphore, self).release()
class Lock(object):
"""A lock for coroutines.
A Lock begins unlocked, and `acquire` locks it immediately. While it is
locked, a coroutine that yields `acquire` waits until another coroutine
calls `release`.
Releasing an unlocked lock raises `RuntimeError`.
2019-01-13 08:01:53 +00:00
A Lock can be used as an async context manager with the ``async
with`` statement:
2016-02-23 06:06:55 +00:00
2019-01-13 08:01:53 +00:00
>>> from tornado import locks
2016-02-23 06:06:55 +00:00
>>> lock = locks.Lock()
>>>
2019-01-13 08:01:53 +00:00
>>> async def f():
... async with lock:
2016-02-23 06:06:55 +00:00
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
2019-01-13 08:01:53 +00:00
For compatibility with older versions of Python, the `.acquire`
method asynchronously returns a regular context manager:
2016-02-23 06:06:55 +00:00
2019-01-13 08:01:53 +00:00
>>> async def f2():
... with (yield lock.acquire()):
2016-02-23 06:06:55 +00:00
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
2019-01-13 08:01:53 +00:00
.. versionchanged:: 4.3
2016-02-23 06:06:55 +00:00
Added ``async with`` support in Python 3.5.
"""
def __init__(self):
self._block = BoundedSemaphore(value=1)
def __repr__(self):
return "<%s _block=%s>" % (
self.__class__.__name__,
self._block)
def acquire(self, timeout=None):
"""Attempt to lock. Returns a Future.
2019-01-13 08:01:53 +00:00
Returns a Future, which raises `tornado.util.TimeoutError` after a
2016-02-23 06:06:55 +00:00
timeout.
"""
return self._block.acquire(timeout)
def release(self):
"""Unlock.
The first coroutine in line waiting for `acquire` gets the lock.
If not locked, raise a `RuntimeError`.
"""
try:
self._block.release()
except ValueError:
raise RuntimeError('release unlocked lock')
def __enter__(self):
raise RuntimeError(
"Use Lock like 'with (yield lock)', not like 'with lock'")
__exit__ = __enter__
@gen.coroutine
def __aenter__(self):
yield self.acquire()
@gen.coroutine
def __aexit__(self, typ, value, tb):
self.release()