147 lines
6 KiB
Python
147 lines
6 KiB
Python
from _weakrefset import WeakSet
|
|
|
|
|
|
def get_cache_token():
|
|
"""Returns the current ABC cache token.
|
|
|
|
The token is an opaque object (supporting equality testing) identifying the
|
|
current version of the ABC cache for virtual subclasses. The token changes
|
|
with every call to ``register()`` on any ABC.
|
|
"""
|
|
return ABCMeta._abc_invalidation_counter
|
|
|
|
|
|
class ABCMeta(type):
|
|
"""Metaclass for defining Abstract Base Classes (ABCs).
|
|
|
|
Use this metaclass to create an ABC. An ABC can be subclassed
|
|
directly, and then acts as a mix-in class. You can also register
|
|
unrelated concrete classes (even built-in classes) and unrelated
|
|
ABCs as 'virtual subclasses' -- these and their descendants will
|
|
be considered subclasses of the registering ABC by the built-in
|
|
issubclass() function, but the registering ABC won't show up in
|
|
their MRO (Method Resolution Order) nor will method
|
|
implementations defined by the registering ABC be callable (not
|
|
even via super()).
|
|
"""
|
|
|
|
# A global counter that is incremented each time a class is
|
|
# registered as a virtual subclass of anything. It forces the
|
|
# negative cache to be cleared before its next use.
|
|
# Note: this counter is private. Use `abc.get_cache_token()` for
|
|
# external code.
|
|
_abc_invalidation_counter = 0
|
|
|
|
def __new__(mcls, name, bases, namespace, **kwargs):
|
|
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
|
|
# Compute set of abstract method names
|
|
abstracts = {name
|
|
for name, value in namespace.items()
|
|
if getattr(value, "__isabstractmethod__", False)}
|
|
for base in bases:
|
|
for name in getattr(base, "__abstractmethods__", set()):
|
|
value = getattr(cls, name, None)
|
|
if getattr(value, "__isabstractmethod__", False):
|
|
abstracts.add(name)
|
|
cls.__abstractmethods__ = frozenset(abstracts)
|
|
# Set up inheritance registry
|
|
cls._abc_registry = WeakSet()
|
|
cls._abc_cache = WeakSet()
|
|
cls._abc_negative_cache = WeakSet()
|
|
cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
|
|
return cls
|
|
|
|
def register(cls, subclass):
|
|
"""Register a virtual subclass of an ABC.
|
|
|
|
Returns the subclass, to allow usage as a class decorator.
|
|
"""
|
|
if not isinstance(subclass, type):
|
|
raise TypeError("Can only register classes")
|
|
if issubclass(subclass, cls):
|
|
return subclass # Already a subclass
|
|
# Subtle: test for cycles *after* testing for "already a subclass";
|
|
# this means we allow X.register(X) and interpret it as a no-op.
|
|
if issubclass(cls, subclass):
|
|
# This would create a cycle, which is bad for the algorithm below
|
|
raise RuntimeError("Refusing to create an inheritance cycle")
|
|
cls._abc_registry.add(subclass)
|
|
ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
|
|
return subclass
|
|
|
|
def _dump_registry(cls, file=None):
|
|
"""Debug helper to print the ABC registry."""
|
|
print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
|
|
print(f"Inv. counter: {get_cache_token()}", file=file)
|
|
for name in cls.__dict__:
|
|
if name.startswith("_abc_"):
|
|
value = getattr(cls, name)
|
|
if isinstance(value, WeakSet):
|
|
value = set(value)
|
|
print(f"{name}: {value!r}", file=file)
|
|
|
|
def _abc_registry_clear(cls):
|
|
"""Clear the registry (for debugging or testing)."""
|
|
cls._abc_registry.clear()
|
|
|
|
def _abc_caches_clear(cls):
|
|
"""Clear the caches (for debugging or testing)."""
|
|
cls._abc_cache.clear()
|
|
cls._abc_negative_cache.clear()
|
|
|
|
def __instancecheck__(cls, instance):
|
|
"""Override for isinstance(instance, cls)."""
|
|
# Inline the cache checking
|
|
subclass = instance.__class__
|
|
if subclass in cls._abc_cache:
|
|
return True
|
|
subtype = type(instance)
|
|
if subtype is subclass:
|
|
if (cls._abc_negative_cache_version ==
|
|
ABCMeta._abc_invalidation_counter and
|
|
subclass in cls._abc_negative_cache):
|
|
return False
|
|
# Fall back to the subclass check.
|
|
return cls.__subclasscheck__(subclass)
|
|
return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
|
|
|
|
def __subclasscheck__(cls, subclass):
|
|
"""Override for issubclass(subclass, cls)."""
|
|
if not isinstance(subclass, type):
|
|
raise TypeError('issubclass() arg 1 must be a class')
|
|
# Check cache
|
|
if subclass in cls._abc_cache:
|
|
return True
|
|
# Check negative cache; may have to invalidate
|
|
if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter:
|
|
# Invalidate the negative cache
|
|
cls._abc_negative_cache = WeakSet()
|
|
cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
|
|
elif subclass in cls._abc_negative_cache:
|
|
return False
|
|
# Check the subclass hook
|
|
ok = cls.__subclasshook__(subclass)
|
|
if ok is not NotImplemented:
|
|
assert isinstance(ok, bool)
|
|
if ok:
|
|
cls._abc_cache.add(subclass)
|
|
else:
|
|
cls._abc_negative_cache.add(subclass)
|
|
return ok
|
|
# Check if it's a direct subclass
|
|
if cls in getattr(subclass, '__mro__', ()):
|
|
cls._abc_cache.add(subclass)
|
|
return True
|
|
# Check if it's a subclass of a registered class (recursive)
|
|
for rcls in cls._abc_registry:
|
|
if issubclass(subclass, rcls):
|
|
cls._abc_cache.add(subclass)
|
|
return True
|
|
# Check if it's a subclass of a subclass (recursive)
|
|
for scls in cls.__subclasses__():
|
|
if issubclass(subclass, scls):
|
|
cls._abc_cache.add(subclass)
|
|
return True
|
|
# No dice; update negative cache
|
|
cls._abc_negative_cache.add(subclass)
|
|
return False
|