openmedialibrary_platform_d.../lib/python3.7/_py_abc.py

148 lines
6 KiB
Python
Raw Permalink Normal View History

2016-02-06 09:36:57 +00:00
from _weakrefset import WeakSet
2018-12-31 23:25:26 +00:00
def get_cache_token():
"""Returns the current ABC cache token.
2016-02-06 09:36:57 +00:00
2018-12-31 23:25:26 +00:00
The token is an opaque object (supporting equality testing) identifying the
current version of the ABC cache for virtual subclasses. The token changes
with every call to ``register()`` on any ABC.
2016-02-06 09:36:57 +00:00
"""
2018-12-31 23:25:26 +00:00
return ABCMeta._abc_invalidation_counter
2016-02-06 09:36:57 +00:00
class ABCMeta(type):
"""Metaclass for defining Abstract Base Classes (ABCs).
Use this metaclass to create an ABC. An ABC can be subclassed
directly, and then acts as a mix-in class. You can also register
unrelated concrete classes (even built-in classes) and unrelated
ABCs as 'virtual subclasses' -- these and their descendants will
be considered subclasses of the registering ABC by the built-in
issubclass() function, but the registering ABC won't show up in
their MRO (Method Resolution Order) nor will method
implementations defined by the registering ABC be callable (not
even via super()).
"""
# A global counter that is incremented each time a class is
# registered as a virtual subclass of anything. It forces the
# negative cache to be cleared before its next use.
# Note: this counter is private. Use `abc.get_cache_token()` for
# external code.
_abc_invalidation_counter = 0
2018-12-31 23:25:26 +00:00
def __new__(mcls, name, bases, namespace, **kwargs):
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
2016-02-06 09:36:57 +00:00
# Compute set of abstract method names
abstracts = {name
for name, value in namespace.items()
if getattr(value, "__isabstractmethod__", False)}
for base in bases:
for name in getattr(base, "__abstractmethods__", set()):
value = getattr(cls, name, None)
if getattr(value, "__isabstractmethod__", False):
abstracts.add(name)
cls.__abstractmethods__ = frozenset(abstracts)
# Set up inheritance registry
cls._abc_registry = WeakSet()
cls._abc_cache = WeakSet()
cls._abc_negative_cache = WeakSet()
cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
return cls
def register(cls, subclass):
"""Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
"""
if not isinstance(subclass, type):
raise TypeError("Can only register classes")
if issubclass(subclass, cls):
return subclass # Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
if issubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raise RuntimeError("Refusing to create an inheritance cycle")
cls._abc_registry.add(subclass)
ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
return subclass
def _dump_registry(cls, file=None):
"""Debug helper to print the ABC registry."""
2018-12-31 23:25:26 +00:00
print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
print(f"Inv. counter: {get_cache_token()}", file=file)
for name in cls.__dict__:
2016-02-06 09:36:57 +00:00
if name.startswith("_abc_"):
value = getattr(cls, name)
2018-12-31 23:25:26 +00:00
if isinstance(value, WeakSet):
value = set(value)
print(f"{name}: {value!r}", file=file)
def _abc_registry_clear(cls):
"""Clear the registry (for debugging or testing)."""
cls._abc_registry.clear()
def _abc_caches_clear(cls):
"""Clear the caches (for debugging or testing)."""
cls._abc_cache.clear()
cls._abc_negative_cache.clear()
2016-02-06 09:36:57 +00:00
def __instancecheck__(cls, instance):
"""Override for isinstance(instance, cls)."""
# Inline the cache checking
subclass = instance.__class__
if subclass in cls._abc_cache:
return True
subtype = type(instance)
if subtype is subclass:
if (cls._abc_negative_cache_version ==
ABCMeta._abc_invalidation_counter and
subclass in cls._abc_negative_cache):
return False
# Fall back to the subclass check.
return cls.__subclasscheck__(subclass)
2018-12-31 23:25:26 +00:00
return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
2016-02-06 09:36:57 +00:00
def __subclasscheck__(cls, subclass):
"""Override for issubclass(subclass, cls)."""
2018-12-31 23:25:26 +00:00
if not isinstance(subclass, type):
raise TypeError('issubclass() arg 1 must be a class')
2016-02-06 09:36:57 +00:00
# Check cache
if subclass in cls._abc_cache:
return True
# Check negative cache; may have to invalidate
if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter:
# Invalidate the negative cache
cls._abc_negative_cache = WeakSet()
cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
elif subclass in cls._abc_negative_cache:
return False
# Check the subclass hook
ok = cls.__subclasshook__(subclass)
if ok is not NotImplemented:
assert isinstance(ok, bool)
if ok:
cls._abc_cache.add(subclass)
else:
cls._abc_negative_cache.add(subclass)
return ok
# Check if it's a direct subclass
if cls in getattr(subclass, '__mro__', ()):
cls._abc_cache.add(subclass)
return True
# Check if it's a subclass of a registered class (recursive)
for rcls in cls._abc_registry:
if issubclass(subclass, rcls):
cls._abc_cache.add(subclass)
return True
# Check if it's a subclass of a subclass (recursive)
for scls in cls.__subclasses__():
if issubclass(subclass, scls):
cls._abc_cache.add(subclass)
return True
# No dice; update negative cache
cls._abc_negative_cache.add(subclass)
return False