Switch to python3

This commit is contained in:
j 2014-09-30 18:15:32 +02:00
commit 9ba4b6a91a
5286 changed files with 677347 additions and 576888 deletions

View file

@ -0,0 +1,8 @@
__all__ = ['FFI', 'VerificationError', 'VerificationMissing', 'CDefError',
'FFIError']
from .api import FFI, CDefError, FFIError
from .ffiplatform import VerificationError, VerificationMissing
__version__ = "0.8.6"
__version_info__ = (0, 8, 6)

View file

@ -0,0 +1,497 @@
import sys, types
from .lock import allocate_lock
try:
callable
except NameError:
# Python 3.1
from collections import Callable
callable = lambda x: isinstance(x, Callable)
try:
basestring
except NameError:
# Python 3.x
basestring = str
class FFIError(Exception):
pass
class CDefError(Exception):
def __str__(self):
try:
line = 'line %d: ' % (self.args[1].coord.line,)
except (AttributeError, TypeError, IndexError):
line = ''
return '%s%s' % (line, self.args[0])
class FFI(object):
r'''
The main top-level class that you instantiate once, or once per module.
Example usage:
ffi = FFI()
ffi.cdef("""
int printf(const char *, ...);
""")
C = ffi.dlopen(None) # standard library
-or-
C = ffi.verify() # use a C compiler: verify the decl above is right
C.printf("hello, %s!\n", ffi.new("char[]", "world"))
'''
def __init__(self, backend=None):
"""Create an FFI instance. The 'backend' argument is used to
select a non-default backend, mostly for tests.
"""
from . import cparser, model
if backend is None:
# You need PyPy (>= 2.0 beta), or a CPython (>= 2.6) with
# _cffi_backend.so compiled.
import _cffi_backend as backend
from . import __version__
assert backend.__version__ == __version__
# (If you insist you can also try to pass the option
# 'backend=backend_ctypes.CTypesBackend()', but don't
# rely on it! It's probably not going to work well.)
self._backend = backend
self._lock = allocate_lock()
self._parser = cparser.Parser()
self._cached_btypes = {}
self._parsed_types = types.ModuleType('parsed_types').__dict__
self._new_types = types.ModuleType('new_types').__dict__
self._function_caches = []
self._libraries = []
self._cdefsources = []
if hasattr(backend, 'set_ffi'):
backend.set_ffi(self)
for name in backend.__dict__:
if name.startswith('RTLD_'):
setattr(self, name, getattr(backend, name))
#
with self._lock:
self.BVoidP = self._get_cached_btype(model.voidp_type)
if isinstance(backend, types.ModuleType):
# _cffi_backend: attach these constants to the class
if not hasattr(FFI, 'NULL'):
FFI.NULL = self.cast(self.BVoidP, 0)
FFI.CData, FFI.CType = backend._get_types()
else:
# ctypes backend: attach these constants to the instance
self.NULL = self.cast(self.BVoidP, 0)
self.CData, self.CType = backend._get_types()
def cdef(self, csource, override=False, packed=False):
"""Parse the given C source. This registers all declared functions,
types, and global variables. The functions and global variables can
then be accessed via either 'ffi.dlopen()' or 'ffi.verify()'.
The types can be used in 'ffi.new()' and other functions.
If 'packed' is specified as True, all structs declared inside this
cdef are packed, i.e. laid out without any field alignment at all.
"""
if not isinstance(csource, str): # unicode, on Python 2
if not isinstance(csource, basestring):
raise TypeError("cdef() argument must be a string")
csource = csource.encode('ascii')
with self._lock:
self._parser.parse(csource, override=override, packed=packed)
self._cdefsources.append(csource)
if override:
for cache in self._function_caches:
cache.clear()
def dlopen(self, name, flags=0):
"""Load and return a dynamic library identified by 'name'.
The standard C library can be loaded by passing None.
Note that functions and types declared by 'ffi.cdef()' are not
linked to a particular library, just like C headers; in the
library we only look for the actual (untyped) symbols.
"""
assert isinstance(name, basestring) or name is None
with self._lock:
lib, function_cache = _make_ffi_library(self, name, flags)
self._function_caches.append(function_cache)
self._libraries.append(lib)
return lib
def _typeof_locked(self, cdecl):
# call me with the lock!
key = cdecl
if key in self._parsed_types:
return self._parsed_types[key]
#
if not isinstance(cdecl, str): # unicode, on Python 2
cdecl = cdecl.encode('ascii')
#
type = self._parser.parse_type(cdecl)
really_a_function_type = type.is_raw_function
if really_a_function_type:
type = type.as_function_pointer()
btype = self._get_cached_btype(type)
result = btype, really_a_function_type
self._parsed_types[key] = result
return result
def _typeof(self, cdecl, consider_function_as_funcptr=False):
# string -> ctype object
try:
result = self._parsed_types[cdecl]
except KeyError:
with self._lock:
result = self._typeof_locked(cdecl)
#
btype, really_a_function_type = result
if really_a_function_type and not consider_function_as_funcptr:
raise CDefError("the type %r is a function type, not a "
"pointer-to-function type" % (cdecl,))
return btype
def typeof(self, cdecl):
"""Parse the C type given as a string and return the
corresponding <ctype> object.
It can also be used on 'cdata' instance to get its C type.
"""
if isinstance(cdecl, basestring):
return self._typeof(cdecl)
if isinstance(cdecl, self.CData):
return self._backend.typeof(cdecl)
if isinstance(cdecl, types.BuiltinFunctionType):
res = _builtin_function_type(cdecl)
if res is not None:
return res
if (isinstance(cdecl, types.FunctionType)
and hasattr(cdecl, '_cffi_base_type')):
with self._lock:
return self._get_cached_btype(cdecl._cffi_base_type)
raise TypeError(type(cdecl))
def sizeof(self, cdecl):
"""Return the size in bytes of the argument. It can be a
string naming a C type, or a 'cdata' instance.
"""
if isinstance(cdecl, basestring):
BType = self._typeof(cdecl)
return self._backend.sizeof(BType)
else:
return self._backend.sizeof(cdecl)
def alignof(self, cdecl):
"""Return the natural alignment size in bytes of the C type
given as a string.
"""
if isinstance(cdecl, basestring):
cdecl = self._typeof(cdecl)
return self._backend.alignof(cdecl)
def offsetof(self, cdecl, fieldname):
"""Return the offset of the named field inside the given
structure, which must be given as a C type name.
"""
if isinstance(cdecl, basestring):
cdecl = self._typeof(cdecl)
return self._backend.typeoffsetof(cdecl, fieldname)[1]
def new(self, cdecl, init=None):
"""Allocate an instance according to the specified C type and
return a pointer to it. The specified C type must be either a
pointer or an array: ``new('X *')`` allocates an X and returns
a pointer to it, whereas ``new('X[n]')`` allocates an array of
n X'es and returns an array referencing it (which works
mostly like a pointer, like in C). You can also use
``new('X[]', n)`` to allocate an array of a non-constant
length n.
The memory is initialized following the rules of declaring a
global variable in C: by default it is zero-initialized, but
an explicit initializer can be given which can be used to
fill all or part of the memory.
When the returned <cdata> object goes out of scope, the memory
is freed. In other words the returned <cdata> object has
ownership of the value of type 'cdecl' that it points to. This
means that the raw data can be used as long as this object is
kept alive, but must not be used for a longer time. Be careful
about that when copying the pointer to the memory somewhere
else, e.g. into another structure.
"""
if isinstance(cdecl, basestring):
cdecl = self._typeof(cdecl)
return self._backend.newp(cdecl, init)
def cast(self, cdecl, source):
"""Similar to a C cast: returns an instance of the named C
type initialized with the given 'source'. The source is
casted between integers or pointers of any type.
"""
if isinstance(cdecl, basestring):
cdecl = self._typeof(cdecl)
return self._backend.cast(cdecl, source)
def string(self, cdata, maxlen=-1):
"""Return a Python string (or unicode string) from the 'cdata'.
If 'cdata' is a pointer or array of characters or bytes, returns
the null-terminated string. The returned string extends until
the first null character, or at most 'maxlen' characters. If
'cdata' is an array then 'maxlen' defaults to its length.
If 'cdata' is a pointer or array of wchar_t, returns a unicode
string following the same rules.
If 'cdata' is a single character or byte or a wchar_t, returns
it as a string or unicode string.
If 'cdata' is an enum, returns the value of the enumerator as a
string, or 'NUMBER' if the value is out of range.
"""
return self._backend.string(cdata, maxlen)
def buffer(self, cdata, size=-1):
"""Return a read-write buffer object that references the raw C data
pointed to by the given 'cdata'. The 'cdata' must be a pointer or
an array. Can be passed to functions expecting a buffer, or directly
manipulated with:
buf[:] get a copy of it in a regular string, or
buf[idx] as a single character
buf[:] = ...
buf[idx] = ... change the content
"""
return self._backend.buffer(cdata, size)
def callback(self, cdecl, python_callable=None, error=None):
"""Return a callback object or a decorator making such a
callback object. 'cdecl' must name a C function pointer type.
The callback invokes the specified 'python_callable' (which may
be provided either directly or via a decorator). Important: the
callback object must be manually kept alive for as long as the
callback may be invoked from the C level.
"""
def callback_decorator_wrap(python_callable):
if not callable(python_callable):
raise TypeError("the 'python_callable' argument "
"is not callable")
return self._backend.callback(cdecl, python_callable, error)
if isinstance(cdecl, basestring):
cdecl = self._typeof(cdecl, consider_function_as_funcptr=True)
if python_callable is None:
return callback_decorator_wrap # decorator mode
else:
return callback_decorator_wrap(python_callable) # direct mode
def getctype(self, cdecl, replace_with=''):
"""Return a string giving the C type 'cdecl', which may be itself
a string or a <ctype> object. If 'replace_with' is given, it gives
extra text to append (or insert for more complicated C types), like
a variable name, or '*' to get actually the C type 'pointer-to-cdecl'.
"""
if isinstance(cdecl, basestring):
cdecl = self._typeof(cdecl)
replace_with = replace_with.strip()
if (replace_with.startswith('*')
and '&[' in self._backend.getcname(cdecl, '&')):
replace_with = '(%s)' % replace_with
elif replace_with and not replace_with[0] in '[(':
replace_with = ' ' + replace_with
return self._backend.getcname(cdecl, replace_with)
def gc(self, cdata, destructor):
"""Return a new cdata object that points to the same
data. Later, when this new cdata object is garbage-collected,
'destructor(old_cdata_object)' will be called.
"""
with self._lock:
try:
gc_weakrefs = self.gc_weakrefs
except AttributeError:
from .gc_weakref import GcWeakrefs
gc_weakrefs = self.gc_weakrefs = GcWeakrefs(self)
return gc_weakrefs.build(cdata, destructor)
def _get_cached_btype(self, type):
assert self._lock.acquire(False) is False
# call me with the lock!
try:
BType = self._cached_btypes[type]
except KeyError:
finishlist = []
BType = type.get_cached_btype(self, finishlist)
for type in finishlist:
type.finish_backend_type(self, finishlist)
return BType
def verify(self, source='', tmpdir=None, **kwargs):
"""Verify that the current ffi signatures compile on this
machine, and return a dynamic library object. The dynamic
library can be used to call functions and access global
variables declared in this 'ffi'. The library is compiled
by the C compiler: it gives you C-level API compatibility
(including calling macros). This is unlike 'ffi.dlopen()',
which requires binary compatibility in the signatures.
"""
from .verifier import Verifier, _caller_dir_pycache
tmpdir = tmpdir or _caller_dir_pycache()
self.verifier = Verifier(self, source, tmpdir, **kwargs)
lib = self.verifier.load_library()
self._libraries.append(lib)
return lib
def _get_errno(self):
return self._backend.get_errno()
def _set_errno(self, errno):
self._backend.set_errno(errno)
errno = property(_get_errno, _set_errno, None,
"the value of 'errno' from/to the C calls")
def getwinerror(self, code=-1):
return self._backend.getwinerror(code)
def _pointer_to(self, ctype):
from . import model
with self._lock:
return model.pointer_cache(self, ctype)
def addressof(self, cdata, field=None):
"""Return the address of a <cdata 'struct-or-union'>.
If 'field' is specified, return the address of this field.
"""
ctype = self._backend.typeof(cdata)
ctype, offset = self._backend.typeoffsetof(ctype, field)
ctypeptr = self._pointer_to(ctype)
return self._backend.rawaddressof(ctypeptr, cdata, offset)
def include(self, ffi_to_include):
"""Includes the typedefs, structs, unions and enums defined
in another FFI instance. Usage is similar to a #include in C,
where a part of the program might include types defined in
another part for its own usage. Note that the include()
method has no effect on functions, constants and global
variables, which must anyway be accessed directly from the
lib object returned by the original FFI instance.
"""
with ffi_to_include._lock:
with self._lock:
self._parser.include(ffi_to_include._parser)
self._cdefsources.append('[')
self._cdefsources.extend(ffi_to_include._cdefsources)
self._cdefsources.append(']')
def new_handle(self, x):
return self._backend.newp_handle(self.BVoidP, x)
def from_handle(self, x):
return self._backend.from_handle(x)
def _load_backend_lib(backend, name, flags):
if name is None:
if sys.platform != "win32":
return backend.load_library(None, flags)
name = "c" # Windows: load_library(None) fails, but this works
# (backward compatibility hack only)
try:
if '.' not in name and '/' not in name:
raise OSError("library not found: %r" % (name,))
return backend.load_library(name, flags)
except OSError:
import ctypes.util
path = ctypes.util.find_library(name)
if path is None:
raise # propagate the original OSError
return backend.load_library(path, flags)
def _make_ffi_library(ffi, libname, flags):
import os
backend = ffi._backend
backendlib = _load_backend_lib(backend, libname, flags)
copied_enums = []
#
def make_accessor_locked(name):
key = 'function ' + name
if key in ffi._parser._declarations:
tp = ffi._parser._declarations[key]
BType = ffi._get_cached_btype(tp)
try:
value = backendlib.load_function(BType, name)
except KeyError as e:
raise AttributeError('%s: %s' % (name, e))
library.__dict__[name] = value
return
#
key = 'variable ' + name
if key in ffi._parser._declarations:
tp = ffi._parser._declarations[key]
BType = ffi._get_cached_btype(tp)
read_variable = backendlib.read_variable
write_variable = backendlib.write_variable
setattr(FFILibrary, name, property(
lambda self: read_variable(BType, name),
lambda self, value: write_variable(BType, name, value)))
return
#
if not copied_enums:
from . import model
for key, tp in ffi._parser._declarations.items():
if not isinstance(tp, model.EnumType):
continue
for enumname, enumval in zip(tp.enumerators, tp.enumvalues):
if enumname not in library.__dict__:
library.__dict__[enumname] = enumval
for key, val in ffi._parser._int_constants.items():
if key not in library.__dict__:
library.__dict__[key] = val
copied_enums.append(True)
if name in library.__dict__:
return
#
raise AttributeError(name)
#
def make_accessor(name):
with ffi._lock:
if name in library.__dict__ or name in FFILibrary.__dict__:
return # added by another thread while waiting for the lock
make_accessor_locked(name)
#
class FFILibrary(object):
def __getattr__(self, name):
make_accessor(name)
return getattr(self, name)
def __setattr__(self, name, value):
try:
property = getattr(self.__class__, name)
except AttributeError:
make_accessor(name)
setattr(self, name, value)
else:
property.__set__(self, value)
#
if libname is not None:
try:
if not isinstance(libname, str): # unicode, on Python 2
libname = libname.encode('utf-8')
FFILibrary.__name__ = 'FFILibrary_%s' % libname
except UnicodeError:
pass
library = FFILibrary()
return library, library.__dict__
def _builtin_function_type(func):
# a hack to make at least ffi.typeof(builtin_function) work,
# if the builtin function was obtained by 'vengine_cpy'.
import sys
try:
module = sys.modules[func.__module__]
ffi = module._cffi_original_ffi
types_of_builtin_funcs = module._cffi_types_of_builtin_funcs
tp = types_of_builtin_funcs[func]
except (KeyError, AttributeError, TypeError):
return None
else:
with ffi._lock:
return ffi._get_cached_btype(tp)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,248 @@
import sys
from . import api, model
COMMON_TYPES = {
'FILE': model.unknown_type('FILE', '_IO_FILE'),
'bool': '_Bool',
}
for _type in model.PrimitiveType.ALL_PRIMITIVE_TYPES:
if _type.endswith('_t'):
COMMON_TYPES[_type] = _type
del _type
_CACHE = {}
def resolve_common_type(commontype):
try:
return _CACHE[commontype]
except KeyError:
result = COMMON_TYPES.get(commontype, commontype)
if not isinstance(result, str):
pass # result is already a BaseType
elif result.endswith(' *'):
if result.startswith('const '):
result = model.ConstPointerType(
resolve_common_type(result[6:-2]))
else:
result = model.PointerType(resolve_common_type(result[:-2]))
elif result in model.PrimitiveType.ALL_PRIMITIVE_TYPES:
result = model.PrimitiveType(result)
else:
if commontype == result:
raise api.FFIError("Unsupported type: %r. Please file a bug "
"if you think it should be." % (commontype,))
result = resolve_common_type(result) # recursively
assert isinstance(result, model.BaseTypeByIdentity)
_CACHE[commontype] = result
return result
# ____________________________________________________________
# Windows common types
def win_common_types(maxsize):
result = {}
if maxsize < (1<<32):
result.update({ # Windows 32-bits
'HALF_PTR': 'short',
'INT_PTR': 'int',
'LONG_PTR': 'long',
'UHALF_PTR': 'unsigned short',
'UINT_PTR': 'unsigned int',
'ULONG_PTR': 'unsigned long',
})
else:
result.update({ # Windows 64-bits
'HALF_PTR': 'int',
'INT_PTR': 'long long',
'LONG_PTR': 'long long',
'UHALF_PTR': 'unsigned int',
'UINT_PTR': 'unsigned long long',
'ULONG_PTR': 'unsigned long long',
})
result.update({
"BYTE": "unsigned char",
"BOOL": "int",
"CCHAR": "char",
"CHAR": "char",
"DWORD": "unsigned long",
"DWORD32": "unsigned int",
"DWORD64": "unsigned long long",
"FLOAT": "float",
"INT": "int",
"INT8": "signed char",
"INT16": "short",
"INT32": "int",
"INT64": "long long",
"LONG": "long",
"LONGLONG": "long long",
"LONG32": "int",
"LONG64": "long long",
"WORD": "unsigned short",
"PVOID": model.voidp_type,
"ULONGLONG": "unsigned long long",
"WCHAR": "wchar_t",
"SHORT": "short",
"TBYTE": "WCHAR",
"TCHAR": "WCHAR",
"UCHAR": "unsigned char",
"UINT": "unsigned int",
"UINT8": "unsigned char",
"UINT16": "unsigned short",
"UINT32": "unsigned int",
"UINT64": "unsigned long long",
"ULONG": "unsigned long",
"ULONG32": "unsigned int",
"ULONG64": "unsigned long long",
"USHORT": "unsigned short",
"SIZE_T": "ULONG_PTR",
"SSIZE_T": "LONG_PTR",
"ATOM": "WORD",
"BOOLEAN": "BYTE",
"COLORREF": "DWORD",
"HANDLE": "PVOID",
"DWORDLONG": "ULONGLONG",
"DWORD_PTR": "ULONG_PTR",
"HACCEL": "HANDLE",
"HBITMAP": "HANDLE",
"HBRUSH": "HANDLE",
"HCOLORSPACE": "HANDLE",
"HCONV": "HANDLE",
"HCONVLIST": "HANDLE",
"HDC": "HANDLE",
"HDDEDATA": "HANDLE",
"HDESK": "HANDLE",
"HDROP": "HANDLE",
"HDWP": "HANDLE",
"HENHMETAFILE": "HANDLE",
"HFILE": "int",
"HFONT": "HANDLE",
"HGDIOBJ": "HANDLE",
"HGLOBAL": "HANDLE",
"HHOOK": "HANDLE",
"HICON": "HANDLE",
"HCURSOR": "HICON",
"HINSTANCE": "HANDLE",
"HKEY": "HANDLE",
"HKL": "HANDLE",
"HLOCAL": "HANDLE",
"HMENU": "HANDLE",
"HMETAFILE": "HANDLE",
"HMODULE": "HINSTANCE",
"HMONITOR": "HANDLE",
"HPALETTE": "HANDLE",
"HPEN": "HANDLE",
"HRESULT": "LONG",
"HRGN": "HANDLE",
"HRSRC": "HANDLE",
"HSZ": "HANDLE",
"WINSTA": "HANDLE",
"HWND": "HANDLE",
"LANGID": "WORD",
"LCID": "DWORD",
"LCTYPE": "DWORD",
"LGRPID": "DWORD",
"LPARAM": "LONG_PTR",
"LPBOOL": "BOOL *",
"LPBYTE": "BYTE *",
"LPCOLORREF": "DWORD *",
"LPCSTR": "const char *",
"LPCVOID": model.const_voidp_type,
"LPCWSTR": "const WCHAR *",
"LPCTSTR": "LPCWSTR",
"LPDWORD": "DWORD *",
"LPHANDLE": "HANDLE *",
"LPINT": "int *",
"LPLONG": "long *",
"LPSTR": "CHAR *",
"LPWSTR": "WCHAR *",
"LPTSTR": "LPWSTR",
"LPVOID": model.voidp_type,
"LPWORD": "WORD *",
"LRESULT": "LONG_PTR",
"PBOOL": "BOOL *",
"PBOOLEAN": "BOOLEAN *",
"PBYTE": "BYTE *",
"PCHAR": "CHAR *",
"PCSTR": "const CHAR *",
"PCTSTR": "LPCWSTR",
"PCWSTR": "const WCHAR *",
"PDWORD": "DWORD *",
"PDWORDLONG": "DWORDLONG *",
"PDWORD_PTR": "DWORD_PTR *",
"PDWORD32": "DWORD32 *",
"PDWORD64": "DWORD64 *",
"PFLOAT": "FLOAT *",
"PHALF_PTR": "HALF_PTR *",
"PHANDLE": "HANDLE *",
"PHKEY": "HKEY *",
"PINT": "int *",
"PINT_PTR": "INT_PTR *",
"PINT8": "INT8 *",
"PINT16": "INT16 *",
"PINT32": "INT32 *",
"PINT64": "INT64 *",
"PLCID": "PDWORD",
"PLONG": "LONG *",
"PLONGLONG": "LONGLONG *",
"PLONG_PTR": "LONG_PTR *",
"PLONG32": "LONG32 *",
"PLONG64": "LONG64 *",
"PSHORT": "SHORT *",
"PSIZE_T": "SIZE_T *",
"PSSIZE_T": "SSIZE_T *",
"PSTR": "CHAR *",
"PTBYTE": "TBYTE *",
"PTCHAR": "TCHAR *",
"PTSTR": "LPWSTR",
"PUCHAR": "UCHAR *",
"PUHALF_PTR": "UHALF_PTR *",
"PUINT": "UINT *",
"PUINT_PTR": "UINT_PTR *",
"PUINT8": "UINT8 *",
"PUINT16": "UINT16 *",
"PUINT32": "UINT32 *",
"PUINT64": "UINT64 *",
"PULONG": "ULONG *",
"PULONGLONG": "ULONGLONG *",
"PULONG_PTR": "ULONG_PTR *",
"PULONG32": "ULONG32 *",
"PULONG64": "ULONG64 *",
"PUSHORT": "USHORT *",
"PWCHAR": "WCHAR *",
"PWORD": "WORD *",
"PWSTR": "WCHAR *",
"QWORD": "unsigned long long",
"SC_HANDLE": "HANDLE",
"SC_LOCK": "LPVOID",
"SERVICE_STATUS_HANDLE": "HANDLE",
"UNICODE_STRING": model.StructType(
"_UNICODE_STRING",
["Length",
"MaximumLength",
"Buffer"],
[model.PrimitiveType("unsigned short"),
model.PrimitiveType("unsigned short"),
model.PointerType(model.PrimitiveType("wchar_t"))],
[-1, -1, -1]),
"PUNICODE_STRING": "UNICODE_STRING *",
"PCUNICODE_STRING": "const UNICODE_STRING *",
"USN": "LONGLONG",
"VOID": model.void_type,
"WPARAM": "UINT_PTR",
})
return result
if sys.platform == 'win32':
COMMON_TYPES.update(win_common_types(sys.maxsize))

View file

@ -0,0 +1,584 @@
from . import api, model
from .commontypes import COMMON_TYPES, resolve_common_type
try:
from . import _pycparser as pycparser
except ImportError:
import pycparser
import weakref, re, sys
try:
if sys.version_info < (3,):
import thread as _thread
else:
import _thread
lock = _thread.allocate_lock()
except ImportError:
lock = None
_r_comment = re.compile(r"/\*.*?\*/|//.*?$", re.DOTALL | re.MULTILINE)
_r_define = re.compile(r"^\s*#\s*define\s+([A-Za-z_][A-Za-z_0-9]*)\s+(.*?)$",
re.MULTILINE)
_r_partial_enum = re.compile(r"=\s*\.\.\.\s*[,}]|\.\.\.\s*\}")
_r_enum_dotdotdot = re.compile(r"__dotdotdot\d+__$")
_r_partial_array = re.compile(r"\[\s*\.\.\.\s*\]")
_r_words = re.compile(r"\w+|\S")
_parser_cache = None
_r_int_literal = re.compile(r"^0?x?[0-9a-f]+u?l?$", re.IGNORECASE)
def _get_parser():
global _parser_cache
if _parser_cache is None:
_parser_cache = pycparser.CParser()
return _parser_cache
def _preprocess(csource):
# Remove comments. NOTE: this only work because the cdef() section
# should not contain any string literal!
csource = _r_comment.sub(' ', csource)
# Remove the "#define FOO x" lines
macros = {}
for match in _r_define.finditer(csource):
macroname, macrovalue = match.groups()
macros[macroname] = macrovalue
csource = _r_define.sub('', csource)
# Replace "[...]" with "[__dotdotdotarray__]"
csource = _r_partial_array.sub('[__dotdotdotarray__]', csource)
# Replace "...}" with "__dotdotdotNUM__}". This construction should
# occur only at the end of enums; at the end of structs we have "...;}"
# and at the end of vararg functions "...);". Also replace "=...[,}]"
# with ",__dotdotdotNUM__[,}]": this occurs in the enums too, when
# giving an unknown value.
matches = list(_r_partial_enum.finditer(csource))
for number, match in enumerate(reversed(matches)):
p = match.start()
if csource[p] == '=':
p2 = csource.find('...', p, match.end())
assert p2 > p
csource = '%s,__dotdotdot%d__ %s' % (csource[:p], number,
csource[p2+3:])
else:
assert csource[p:p+3] == '...'
csource = '%s __dotdotdot%d__ %s' % (csource[:p], number,
csource[p+3:])
# Replace all remaining "..." with the same name, "__dotdotdot__",
# which is declared with a typedef for the purpose of C parsing.
return csource.replace('...', ' __dotdotdot__ '), macros
def _common_type_names(csource):
# Look in the source for what looks like usages of types from the
# list of common types. A "usage" is approximated here as the
# appearance of the word, minus a "definition" of the type, which
# is the last word in a "typedef" statement. Approximative only
# but should be fine for all the common types.
look_for_words = set(COMMON_TYPES)
look_for_words.add(';')
look_for_words.add('typedef')
words_used = set()
is_typedef = False
previous_word = ''
for word in _r_words.findall(csource):
if word in look_for_words:
if word == ';':
if is_typedef:
words_used.discard(previous_word)
look_for_words.discard(previous_word)
is_typedef = False
elif word == 'typedef':
is_typedef = True
else: # word in COMMON_TYPES
words_used.add(word)
previous_word = word
return words_used
class Parser(object):
def __init__(self):
self._declarations = {}
self._anonymous_counter = 0
self._structnode2type = weakref.WeakKeyDictionary()
self._override = False
self._packed = False
self._int_constants = {}
def _parse(self, csource):
csource, macros = _preprocess(csource)
# XXX: for more efficiency we would need to poke into the
# internals of CParser... the following registers the
# typedefs, because their presence or absence influences the
# parsing itself (but what they are typedef'ed to plays no role)
ctn = _common_type_names(csource)
typenames = []
for name in sorted(self._declarations):
if name.startswith('typedef '):
name = name[8:]
typenames.append(name)
ctn.discard(name)
typenames += sorted(ctn)
#
csourcelines = ['typedef int %s;' % typename for typename in typenames]
csourcelines.append('typedef int __dotdotdot__;')
csourcelines.append(csource)
csource = '\n'.join(csourcelines)
if lock is not None:
lock.acquire() # pycparser is not thread-safe...
try:
ast = _get_parser().parse(csource)
except pycparser.c_parser.ParseError as e:
self.convert_pycparser_error(e, csource)
finally:
if lock is not None:
lock.release()
# csource will be used to find buggy source text
return ast, macros, csource
def _convert_pycparser_error(self, e, csource):
# xxx look for ":NUM:" at the start of str(e) and try to interpret
# it as a line number
line = None
msg = str(e)
if msg.startswith(':') and ':' in msg[1:]:
linenum = msg[1:msg.find(':',1)]
if linenum.isdigit():
linenum = int(linenum, 10)
csourcelines = csource.splitlines()
if 1 <= linenum <= len(csourcelines):
line = csourcelines[linenum-1]
return line
def convert_pycparser_error(self, e, csource):
line = self._convert_pycparser_error(e, csource)
msg = str(e)
if line:
msg = 'cannot parse "%s"\n%s' % (line.strip(), msg)
else:
msg = 'parse error\n%s' % (msg,)
raise api.CDefError(msg)
def parse(self, csource, override=False, packed=False):
prev_override = self._override
prev_packed = self._packed
try:
self._override = override
self._packed = packed
self._internal_parse(csource)
finally:
self._override = prev_override
self._packed = prev_packed
def _internal_parse(self, csource):
ast, macros, csource = self._parse(csource)
# add the macros
self._process_macros(macros)
# find the first "__dotdotdot__" and use that as a separator
# between the repeated typedefs and the real csource
iterator = iter(ast.ext)
for decl in iterator:
if decl.name == '__dotdotdot__':
break
#
try:
for decl in iterator:
if isinstance(decl, pycparser.c_ast.Decl):
self._parse_decl(decl)
elif isinstance(decl, pycparser.c_ast.Typedef):
if not decl.name:
raise api.CDefError("typedef does not declare any name",
decl)
if (isinstance(decl.type.type, pycparser.c_ast.IdentifierType)
and decl.type.type.names == ['__dotdotdot__']):
realtype = model.unknown_type(decl.name)
elif (isinstance(decl.type, pycparser.c_ast.PtrDecl) and
isinstance(decl.type.type, pycparser.c_ast.TypeDecl) and
isinstance(decl.type.type.type,
pycparser.c_ast.IdentifierType) and
decl.type.type.type.names == ['__dotdotdot__']):
realtype = model.unknown_ptr_type(decl.name)
else:
realtype = self._get_type(decl.type, name=decl.name)
self._declare('typedef ' + decl.name, realtype)
else:
raise api.CDefError("unrecognized construct", decl)
except api.FFIError as e:
msg = self._convert_pycparser_error(e, csource)
if msg:
e.args = (e.args[0] + "\n *** Err: %s" % msg,)
raise
def _add_constants(self, key, val):
if key in self._int_constants:
raise api.FFIError(
"multiple declarations of constant: %s" % (key,))
self._int_constants[key] = val
def _process_macros(self, macros):
for key, value in macros.items():
value = value.strip()
match = _r_int_literal.search(value)
if match is not None:
int_str = match.group(0).lower().rstrip("ul")
# "010" is not valid oct in py3
if (int_str.startswith("0") and
int_str != "0" and
not int_str.startswith("0x")):
int_str = "0o" + int_str[1:]
pyvalue = int(int_str, 0)
self._add_constants(key, pyvalue)
elif value == '...':
self._declare('macro ' + key, value)
else:
raise api.CDefError('only supports the syntax "#define '
'%s ..." (literally) or "#define '
'%s 0x1FF" for now' % (key, key))
def _parse_decl(self, decl):
node = decl.type
if isinstance(node, pycparser.c_ast.FuncDecl):
tp = self._get_type(node, name=decl.name)
assert isinstance(tp, model.RawFunctionType)
tp = self._get_type_pointer(tp)
self._declare('function ' + decl.name, tp)
else:
if isinstance(node, pycparser.c_ast.Struct):
# XXX do we need self._declare in any of those?
if node.decls is not None:
self._get_struct_union_enum_type('struct', node)
elif isinstance(node, pycparser.c_ast.Union):
if node.decls is not None:
self._get_struct_union_enum_type('union', node)
elif isinstance(node, pycparser.c_ast.Enum):
if node.values is not None:
self._get_struct_union_enum_type('enum', node)
elif not decl.name:
raise api.CDefError("construct does not declare any variable",
decl)
#
if decl.name:
tp = self._get_type(node, partial_length_ok=True)
if self._is_constant_globalvar(node):
self._declare('constant ' + decl.name, tp)
else:
self._declare('variable ' + decl.name, tp)
def parse_type(self, cdecl):
ast, macros = self._parse('void __dummy(\n%s\n);' % cdecl)[:2]
assert not macros
exprnode = ast.ext[-1].type.args.params[0]
if isinstance(exprnode, pycparser.c_ast.ID):
raise api.CDefError("unknown identifier '%s'" % (exprnode.name,))
return self._get_type(exprnode.type)
def _declare(self, name, obj):
if name in self._declarations:
if self._declarations[name] is obj:
return
if not self._override:
raise api.FFIError(
"multiple declarations of %s (for interactive usage, "
"try cdef(xx, override=True))" % (name,))
assert '__dotdotdot__' not in name.split()
self._declarations[name] = obj
def _get_type_pointer(self, type, const=False):
if isinstance(type, model.RawFunctionType):
return type.as_function_pointer()
if const:
return model.ConstPointerType(type)
return model.PointerType(type)
def _get_type(self, typenode, name=None, partial_length_ok=False):
# first, dereference typedefs, if we have it already parsed, we're good
if (isinstance(typenode, pycparser.c_ast.TypeDecl) and
isinstance(typenode.type, pycparser.c_ast.IdentifierType) and
len(typenode.type.names) == 1 and
('typedef ' + typenode.type.names[0]) in self._declarations):
type = self._declarations['typedef ' + typenode.type.names[0]]
return type
#
if isinstance(typenode, pycparser.c_ast.ArrayDecl):
# array type
if typenode.dim is None:
length = None
else:
length = self._parse_constant(
typenode.dim, partial_length_ok=partial_length_ok)
return model.ArrayType(self._get_type(typenode.type), length)
#
if isinstance(typenode, pycparser.c_ast.PtrDecl):
# pointer type
const = (isinstance(typenode.type, pycparser.c_ast.TypeDecl)
and 'const' in typenode.type.quals)
return self._get_type_pointer(self._get_type(typenode.type), const)
#
if isinstance(typenode, pycparser.c_ast.TypeDecl):
type = typenode.type
if isinstance(type, pycparser.c_ast.IdentifierType):
# assume a primitive type. get it from .names, but reduce
# synonyms to a single chosen combination
names = list(type.names)
if names != ['signed', 'char']: # keep this unmodified
prefixes = {}
while names:
name = names[0]
if name in ('short', 'long', 'signed', 'unsigned'):
prefixes[name] = prefixes.get(name, 0) + 1
del names[0]
else:
break
# ignore the 'signed' prefix below, and reorder the others
newnames = []
for prefix in ('unsigned', 'short', 'long'):
for i in range(prefixes.get(prefix, 0)):
newnames.append(prefix)
if not names:
names = ['int'] # implicitly
if names == ['int']: # but kill it if 'short' or 'long'
if 'short' in prefixes or 'long' in prefixes:
names = []
names = newnames + names
ident = ' '.join(names)
if ident == 'void':
return model.void_type
if ident == '__dotdotdot__':
raise api.FFIError(':%d: bad usage of "..."' %
typenode.coord.line)
return resolve_common_type(ident)
#
if isinstance(type, pycparser.c_ast.Struct):
# 'struct foobar'
return self._get_struct_union_enum_type('struct', type, name)
#
if isinstance(type, pycparser.c_ast.Union):
# 'union foobar'
return self._get_struct_union_enum_type('union', type, name)
#
if isinstance(type, pycparser.c_ast.Enum):
# 'enum foobar'
return self._get_struct_union_enum_type('enum', type, name)
#
if isinstance(typenode, pycparser.c_ast.FuncDecl):
# a function type
return self._parse_function_type(typenode, name)
#
# nested anonymous structs or unions end up here
if isinstance(typenode, pycparser.c_ast.Struct):
return self._get_struct_union_enum_type('struct', typenode, name,
nested=True)
if isinstance(typenode, pycparser.c_ast.Union):
return self._get_struct_union_enum_type('union', typenode, name,
nested=True)
#
raise api.FFIError(":%d: bad or unsupported type declaration" %
typenode.coord.line)
def _parse_function_type(self, typenode, funcname=None):
params = list(getattr(typenode.args, 'params', []))
ellipsis = (
len(params) > 0 and
isinstance(params[-1].type, pycparser.c_ast.TypeDecl) and
isinstance(params[-1].type.type,
pycparser.c_ast.IdentifierType) and
params[-1].type.type.names == ['__dotdotdot__'])
if ellipsis:
params.pop()
if not params:
raise api.CDefError(
"%s: a function with only '(...)' as argument"
" is not correct C" % (funcname or 'in expression'))
elif (len(params) == 1 and
isinstance(params[0].type, pycparser.c_ast.TypeDecl) and
isinstance(params[0].type.type, pycparser.c_ast.IdentifierType)
and list(params[0].type.type.names) == ['void']):
del params[0]
args = [self._as_func_arg(self._get_type(argdeclnode.type))
for argdeclnode in params]
result = self._get_type(typenode.type)
return model.RawFunctionType(tuple(args), result, ellipsis)
def _as_func_arg(self, type):
if isinstance(type, model.ArrayType):
return model.PointerType(type.item)
elif isinstance(type, model.RawFunctionType):
return type.as_function_pointer()
else:
return type
def _is_constant_globalvar(self, typenode):
if isinstance(typenode, pycparser.c_ast.PtrDecl):
return 'const' in typenode.quals
if isinstance(typenode, pycparser.c_ast.TypeDecl):
return 'const' in typenode.quals
return False
def _get_struct_union_enum_type(self, kind, type, name=None, nested=False):
# First, a level of caching on the exact 'type' node of the AST.
# This is obscure, but needed because pycparser "unrolls" declarations
# such as "typedef struct { } foo_t, *foo_p" and we end up with
# an AST that is not a tree, but a DAG, with the "type" node of the
# two branches foo_t and foo_p of the trees being the same node.
# It's a bit silly but detecting "DAG-ness" in the AST tree seems
# to be the only way to distinguish this case from two independent
# structs. See test_struct_with_two_usages.
try:
return self._structnode2type[type]
except KeyError:
pass
#
# Note that this must handle parsing "struct foo" any number of
# times and always return the same StructType object. Additionally,
# one of these times (not necessarily the first), the fields of
# the struct can be specified with "struct foo { ...fields... }".
# If no name is given, then we have to create a new anonymous struct
# with no caching; in this case, the fields are either specified
# right now or never.
#
force_name = name
name = type.name
#
# get the type or create it if needed
if name is None:
# 'force_name' is used to guess a more readable name for
# anonymous structs, for the common case "typedef struct { } foo".
if force_name is not None:
explicit_name = '$%s' % force_name
else:
self._anonymous_counter += 1
explicit_name = '$%d' % self._anonymous_counter
tp = None
else:
explicit_name = name
key = '%s %s' % (kind, name)
tp = self._declarations.get(key, None)
#
if tp is None:
if kind == 'struct':
tp = model.StructType(explicit_name, None, None, None)
elif kind == 'union':
tp = model.UnionType(explicit_name, None, None, None)
elif kind == 'enum':
tp = self._build_enum_type(explicit_name, type.values)
else:
raise AssertionError("kind = %r" % (kind,))
if name is not None:
self._declare(key, tp)
else:
if kind == 'enum' and type.values is not None:
raise NotImplementedError(
"enum %s: the '{}' declaration should appear on the first "
"time the enum is mentioned, not later" % explicit_name)
if not tp.forcename:
tp.force_the_name(force_name)
if tp.forcename and '$' in tp.name:
self._declare('anonymous %s' % tp.forcename, tp)
#
self._structnode2type[type] = tp
#
# enums: done here
if kind == 'enum':
return tp
#
# is there a 'type.decls'? If yes, then this is the place in the
# C sources that declare the fields. If no, then just return the
# existing type, possibly still incomplete.
if type.decls is None:
return tp
#
if tp.fldnames is not None:
raise api.CDefError("duplicate declaration of struct %s" % name)
fldnames = []
fldtypes = []
fldbitsize = []
for decl in type.decls:
if (isinstance(decl.type, pycparser.c_ast.IdentifierType) and
''.join(decl.type.names) == '__dotdotdot__'):
# XXX pycparser is inconsistent: 'names' should be a list
# of strings, but is sometimes just one string. Use
# str.join() as a way to cope with both.
self._make_partial(tp, nested)
continue
if decl.bitsize is None:
bitsize = -1
else:
bitsize = self._parse_constant(decl.bitsize)
self._partial_length = False
type = self._get_type(decl.type, partial_length_ok=True)
if self._partial_length:
self._make_partial(tp, nested)
if isinstance(type, model.StructType) and type.partial:
self._make_partial(tp, nested)
fldnames.append(decl.name or '')
fldtypes.append(type)
fldbitsize.append(bitsize)
tp.fldnames = tuple(fldnames)
tp.fldtypes = tuple(fldtypes)
tp.fldbitsize = tuple(fldbitsize)
if fldbitsize != [-1] * len(fldbitsize):
if isinstance(tp, model.StructType) and tp.partial:
raise NotImplementedError("%s: using both bitfields and '...;'"
% (tp,))
tp.packed = self._packed
return tp
def _make_partial(self, tp, nested):
if not isinstance(tp, model.StructOrUnion):
raise api.CDefError("%s cannot be partial" % (tp,))
if not tp.has_c_name() and not nested:
raise NotImplementedError("%s is partial but has no C name" %(tp,))
tp.partial = True
def _parse_constant(self, exprnode, partial_length_ok=False):
# for now, limited to expressions that are an immediate number
# or negative number
if isinstance(exprnode, pycparser.c_ast.Constant):
return int(exprnode.value, 0)
#
if (isinstance(exprnode, pycparser.c_ast.UnaryOp) and
exprnode.op == '-'):
return -self._parse_constant(exprnode.expr)
# load previously defined int constant
if (isinstance(exprnode, pycparser.c_ast.ID) and
exprnode.name in self._int_constants):
return self._int_constants[exprnode.name]
#
if partial_length_ok:
if (isinstance(exprnode, pycparser.c_ast.ID) and
exprnode.name == '__dotdotdotarray__'):
self._partial_length = True
return '...'
#
raise api.FFIError(":%d: unsupported expression: expected a "
"simple numeric constant" % exprnode.coord.line)
def _build_enum_type(self, explicit_name, decls):
if decls is not None:
enumerators1 = [enum.name for enum in decls.enumerators]
enumerators = [s for s in enumerators1
if not _r_enum_dotdotdot.match(s)]
partial = len(enumerators) < len(enumerators1)
enumerators = tuple(enumerators)
enumvalues = []
nextenumvalue = 0
for enum in decls.enumerators[:len(enumerators)]:
if enum.value is not None:
nextenumvalue = self._parse_constant(enum.value)
enumvalues.append(nextenumvalue)
self._add_constants(enum.name, nextenumvalue)
nextenumvalue += 1
enumvalues = tuple(enumvalues)
tp = model.EnumType(explicit_name, enumerators, enumvalues)
tp.partial = partial
else: # opaque enum
tp = model.EnumType(explicit_name, (), ())
return tp
def include(self, other):
for name, tp in other._declarations.items():
kind = name.split(' ', 1)[0]
if kind in ('typedef', 'struct', 'union', 'enum'):
self._declare(name, tp)
for k, v in other._int_constants.items():
self._add_constants(k, v)

View file

@ -0,0 +1,112 @@
import os
class VerificationError(Exception):
""" An error raised when verification fails
"""
class VerificationMissing(Exception):
""" An error raised when incomplete structures are passed into
cdef, but no verification has been done
"""
def get_extension(srcfilename, modname, sources=(), **kwds):
from distutils.core import Extension
allsources = [srcfilename]
allsources.extend(sources)
return Extension(name=modname, sources=allsources, **kwds)
def compile(tmpdir, ext):
"""Compile a C extension module using distutils."""
saved_environ = os.environ.copy()
try:
outputfilename = _build(tmpdir, ext)
outputfilename = os.path.abspath(outputfilename)
finally:
# workaround for a distutils bugs where some env vars can
# become longer and longer every time it is used
for key, value in saved_environ.items():
if os.environ.get(key) != value:
os.environ[key] = value
return outputfilename
def _build(tmpdir, ext):
# XXX compact but horrible :-(
from distutils.core import Distribution
import distutils.errors
#
dist = Distribution({'ext_modules': [ext]})
dist.parse_config_files()
options = dist.get_option_dict('build_ext')
options['force'] = ('ffiplatform', True)
options['build_lib'] = ('ffiplatform', tmpdir)
options['build_temp'] = ('ffiplatform', tmpdir)
#
try:
dist.run_command('build_ext')
except (distutils.errors.CompileError,
distutils.errors.LinkError) as e:
raise VerificationError('%s: %s' % (e.__class__.__name__, e))
#
cmd_obj = dist.get_command_obj('build_ext')
[soname] = cmd_obj.get_outputs()
return soname
try:
from os.path import samefile
except ImportError:
def samefile(f1, f2):
return os.path.abspath(f1) == os.path.abspath(f2)
def maybe_relative_path(path):
if not os.path.isabs(path):
return path # already relative
dir = path
names = []
while True:
prevdir = dir
dir, name = os.path.split(prevdir)
if dir == prevdir or not dir:
return path # failed to make it relative
names.append(name)
try:
if samefile(dir, os.curdir):
names.reverse()
return os.path.join(*names)
except OSError:
pass
# ____________________________________________________________
try:
int_or_long = (int, long)
import cStringIO
except NameError:
int_or_long = int # Python 3
import io as cStringIO
def _flatten(x, f):
if isinstance(x, str):
f.write('%ds%s' % (len(x), x))
elif isinstance(x, dict):
keys = sorted(x.keys())
f.write('%dd' % len(keys))
for key in keys:
_flatten(key, f)
_flatten(x[key], f)
elif isinstance(x, (list, tuple)):
f.write('%dl' % len(x))
for value in x:
_flatten(value, f)
elif isinstance(x, int_or_long):
f.write('%di' % (x,))
else:
raise TypeError(
"the keywords to verify() contains unsupported object %r" % (x,))
def flatten(x):
f = cStringIO.StringIO()
_flatten(x, f)
return f.getvalue()

View file

@ -0,0 +1,19 @@
from weakref import ref
class GcWeakrefs(object):
# code copied and adapted from WeakKeyDictionary.
def __init__(self, ffi):
self.ffi = ffi
self.data = data = {}
def remove(k):
destructor, cdata = data.pop(k)
destructor(cdata)
self.remove = remove
def build(self, cdata, destructor):
# make a new cdata of the same type as the original one
new_cdata = self.ffi.cast(self.ffi._backend.typeof(cdata), cdata)
self.data[ref(new_cdata, self.remove)] = destructor, cdata
return new_cdata

View file

@ -0,0 +1,30 @@
import sys
if sys.version_info < (3,):
try:
from thread import allocate_lock
except ImportError:
from dummy_thread import allocate_lock
else:
try:
from _thread import allocate_lock
except ImportError:
from _dummy_thread import allocate_lock
##import sys
##l1 = allocate_lock
##class allocate_lock(object):
## def __init__(self):
## self._real = l1()
## def __enter__(self):
## for i in range(4, 0, -1):
## print sys._getframe(i).f_code
## print
## return self._real.__enter__()
## def __exit__(self, *args):
## return self._real.__exit__(*args)
## def acquire(self, f):
## assert f is False
## return self._real.acquire(f)

View file

@ -0,0 +1,499 @@
import types
import weakref
from .lock import allocate_lock
class BaseTypeByIdentity(object):
is_array_type = False
is_raw_function = False
def get_c_name(self, replace_with='', context='a C file'):
result = self.c_name_with_marker
assert result.count('&') == 1
# some logic duplication with ffi.getctype()... :-(
replace_with = replace_with.strip()
if replace_with:
if replace_with.startswith('*') and '&[' in result:
replace_with = '(%s)' % replace_with
elif not replace_with[0] in '[(':
replace_with = ' ' + replace_with
result = result.replace('&', replace_with)
if '$' in result:
from .ffiplatform import VerificationError
raise VerificationError(
"cannot generate '%s' in %s: unknown type name"
% (self._get_c_name(), context))
return result
def _get_c_name(self):
return self.c_name_with_marker.replace('&', '')
def has_c_name(self):
return '$' not in self._get_c_name()
def get_cached_btype(self, ffi, finishlist, can_delay=False):
try:
BType = ffi._cached_btypes[self]
except KeyError:
BType = self.build_backend_type(ffi, finishlist)
BType2 = ffi._cached_btypes.setdefault(self, BType)
assert BType2 is BType
return BType
def __repr__(self):
return '<%s>' % (self._get_c_name(),)
def _get_items(self):
return [(name, getattr(self, name)) for name in self._attrs_]
class BaseType(BaseTypeByIdentity):
def __eq__(self, other):
return (self.__class__ == other.__class__ and
self._get_items() == other._get_items())
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash((self.__class__, tuple(self._get_items())))
class VoidType(BaseType):
_attrs_ = ()
def __init__(self):
self.c_name_with_marker = 'void&'
def build_backend_type(self, ffi, finishlist):
return global_cache(self, ffi, 'new_void_type')
void_type = VoidType()
class PrimitiveType(BaseType):
_attrs_ = ('name',)
ALL_PRIMITIVE_TYPES = {
'char': 'c',
'short': 'i',
'int': 'i',
'long': 'i',
'long long': 'i',
'signed char': 'i',
'unsigned char': 'i',
'unsigned short': 'i',
'unsigned int': 'i',
'unsigned long': 'i',
'unsigned long long': 'i',
'float': 'f',
'double': 'f',
'long double': 'f',
'_Bool': 'i',
# the following types are not primitive in the C sense
'wchar_t': 'c',
'int8_t': 'i',
'uint8_t': 'i',
'int16_t': 'i',
'uint16_t': 'i',
'int32_t': 'i',
'uint32_t': 'i',
'int64_t': 'i',
'uint64_t': 'i',
'intptr_t': 'i',
'uintptr_t': 'i',
'ptrdiff_t': 'i',
'size_t': 'i',
'ssize_t': 'i',
}
def __init__(self, name):
assert name in self.ALL_PRIMITIVE_TYPES
self.name = name
self.c_name_with_marker = name + '&'
def is_char_type(self):
return self.ALL_PRIMITIVE_TYPES[self.name] == 'c'
def is_integer_type(self):
return self.ALL_PRIMITIVE_TYPES[self.name] == 'i'
def is_float_type(self):
return self.ALL_PRIMITIVE_TYPES[self.name] == 'f'
def build_backend_type(self, ffi, finishlist):
return global_cache(self, ffi, 'new_primitive_type', self.name)
class BaseFunctionType(BaseType):
_attrs_ = ('args', 'result', 'ellipsis')
def __init__(self, args, result, ellipsis):
self.args = args
self.result = result
self.ellipsis = ellipsis
#
reprargs = [arg._get_c_name() for arg in self.args]
if self.ellipsis:
reprargs.append('...')
reprargs = reprargs or ['void']
replace_with = self._base_pattern % (', '.join(reprargs),)
self.c_name_with_marker = (
self.result.c_name_with_marker.replace('&', replace_with))
class RawFunctionType(BaseFunctionType):
# Corresponds to a C type like 'int(int)', which is the C type of
# a function, but not a pointer-to-function. The backend has no
# notion of such a type; it's used temporarily by parsing.
_base_pattern = '(&)(%s)'
is_raw_function = True
def build_backend_type(self, ffi, finishlist):
from . import api
raise api.CDefError("cannot render the type %r: it is a function "
"type, not a pointer-to-function type" % (self,))
def as_function_pointer(self):
return FunctionPtrType(self.args, self.result, self.ellipsis)
class FunctionPtrType(BaseFunctionType):
_base_pattern = '(*&)(%s)'
def build_backend_type(self, ffi, finishlist):
result = self.result.get_cached_btype(ffi, finishlist)
args = []
for tp in self.args:
args.append(tp.get_cached_btype(ffi, finishlist))
return global_cache(self, ffi, 'new_function_type',
tuple(args), result, self.ellipsis)
class PointerType(BaseType):
_attrs_ = ('totype',)
_base_pattern = " *&"
_base_pattern_array = "(*&)"
def __init__(self, totype):
self.totype = totype
if totype.is_array_type:
extra = self._base_pattern_array
else:
extra = self._base_pattern
self.c_name_with_marker = totype.c_name_with_marker.replace('&', extra)
def build_backend_type(self, ffi, finishlist):
BItem = self.totype.get_cached_btype(ffi, finishlist, can_delay=True)
return global_cache(self, ffi, 'new_pointer_type', BItem)
voidp_type = PointerType(void_type)
class ConstPointerType(PointerType):
_base_pattern = " const *&"
_base_pattern_array = "(const *&)"
const_voidp_type = ConstPointerType(void_type)
class NamedPointerType(PointerType):
_attrs_ = ('totype', 'name')
def __init__(self, totype, name):
PointerType.__init__(self, totype)
self.name = name
self.c_name_with_marker = name + '&'
class ArrayType(BaseType):
_attrs_ = ('item', 'length')
is_array_type = True
def __init__(self, item, length):
self.item = item
self.length = length
#
if length is None:
brackets = '&[]'
elif length == '...':
brackets = '&[/*...*/]'
else:
brackets = '&[%d]' % length
self.c_name_with_marker = (
self.item.c_name_with_marker.replace('&', brackets))
def resolve_length(self, newlength):
return ArrayType(self.item, newlength)
def build_backend_type(self, ffi, finishlist):
if self.length == '...':
from . import api
raise api.CDefError("cannot render the type %r: unknown length" %
(self,))
self.item.get_cached_btype(ffi, finishlist) # force the item BType
BPtrItem = PointerType(self.item).get_cached_btype(ffi, finishlist)
return global_cache(self, ffi, 'new_array_type', BPtrItem, self.length)
class StructOrUnionOrEnum(BaseTypeByIdentity):
_attrs_ = ('name',)
forcename = None
def build_c_name_with_marker(self):
name = self.forcename or '%s %s' % (self.kind, self.name)
self.c_name_with_marker = name + '&'
def force_the_name(self, forcename):
self.forcename = forcename
self.build_c_name_with_marker()
def get_official_name(self):
assert self.c_name_with_marker.endswith('&')
return self.c_name_with_marker[:-1]
class StructOrUnion(StructOrUnionOrEnum):
fixedlayout = None
completed = False
partial = False
packed = False
def __init__(self, name, fldnames, fldtypes, fldbitsize):
self.name = name
self.fldnames = fldnames
self.fldtypes = fldtypes
self.fldbitsize = fldbitsize
self.build_c_name_with_marker()
def enumfields(self):
for name, type, bitsize in zip(self.fldnames, self.fldtypes,
self.fldbitsize):
if name == '' and isinstance(type, StructOrUnion):
# nested anonymous struct/union
for result in type.enumfields():
yield result
else:
yield (name, type, bitsize)
def force_flatten(self):
# force the struct or union to have a declaration that lists
# directly all fields returned by enumfields(), flattening
# nested anonymous structs/unions.
names = []
types = []
bitsizes = []
for name, type, bitsize in self.enumfields():
names.append(name)
types.append(type)
bitsizes.append(bitsize)
self.fldnames = tuple(names)
self.fldtypes = tuple(types)
self.fldbitsize = tuple(bitsizes)
def get_cached_btype(self, ffi, finishlist, can_delay=False):
BType = StructOrUnionOrEnum.get_cached_btype(self, ffi, finishlist,
can_delay)
if not can_delay:
self.finish_backend_type(ffi, finishlist)
return BType
def finish_backend_type(self, ffi, finishlist):
if self.completed:
if self.completed != 2:
raise NotImplementedError("recursive structure declaration "
"for '%s'" % (self.name,))
return
BType = ffi._cached_btypes[self]
if self.fldtypes is None:
return # not completing it: it's an opaque struct
#
self.completed = 1
#
if self.fixedlayout is None:
fldtypes = [tp.get_cached_btype(ffi, finishlist)
for tp in self.fldtypes]
lst = list(zip(self.fldnames, fldtypes, self.fldbitsize))
sflags = 0
if self.packed:
sflags = 8 # SF_PACKED
ffi._backend.complete_struct_or_union(BType, lst, self,
-1, -1, sflags)
#
else:
fldtypes = []
fieldofs, fieldsize, totalsize, totalalignment = self.fixedlayout
for i in range(len(self.fldnames)):
fsize = fieldsize[i]
ftype = self.fldtypes[i]
#
if isinstance(ftype, ArrayType) and ftype.length == '...':
# fix the length to match the total size
BItemType = ftype.item.get_cached_btype(ffi, finishlist)
nlen, nrest = divmod(fsize, ffi.sizeof(BItemType))
if nrest != 0:
self._verification_error(
"field '%s.%s' has a bogus size?" % (
self.name, self.fldnames[i] or '{}'))
ftype = ftype.resolve_length(nlen)
self.fldtypes = (self.fldtypes[:i] + (ftype,) +
self.fldtypes[i+1:])
#
BFieldType = ftype.get_cached_btype(ffi, finishlist)
if isinstance(ftype, ArrayType) and ftype.length is None:
assert fsize == 0
else:
bitemsize = ffi.sizeof(BFieldType)
if bitemsize != fsize:
self._verification_error(
"field '%s.%s' is declared as %d bytes, but is "
"really %d bytes" % (self.name,
self.fldnames[i] or '{}',
bitemsize, fsize))
fldtypes.append(BFieldType)
#
lst = list(zip(self.fldnames, fldtypes, self.fldbitsize, fieldofs))
ffi._backend.complete_struct_or_union(BType, lst, self,
totalsize, totalalignment)
self.completed = 2
def _verification_error(self, msg):
from .ffiplatform import VerificationError
raise VerificationError(msg)
def check_not_partial(self):
if self.partial and self.fixedlayout is None:
from . import ffiplatform
raise ffiplatform.VerificationMissing(self._get_c_name())
def build_backend_type(self, ffi, finishlist):
self.check_not_partial()
finishlist.append(self)
#
return global_cache(self, ffi, 'new_%s_type' % self.kind,
self.get_official_name(), key=self)
class StructType(StructOrUnion):
kind = 'struct'
class UnionType(StructOrUnion):
kind = 'union'
class EnumType(StructOrUnionOrEnum):
kind = 'enum'
partial = False
partial_resolved = False
def __init__(self, name, enumerators, enumvalues, baseinttype=None):
self.name = name
self.enumerators = enumerators
self.enumvalues = enumvalues
self.baseinttype = baseinttype
self.build_c_name_with_marker()
def force_the_name(self, forcename):
StructOrUnionOrEnum.force_the_name(self, forcename)
if self.forcename is None:
name = self.get_official_name()
self.forcename = '$' + name.replace(' ', '_')
def check_not_partial(self):
if self.partial and not self.partial_resolved:
from . import ffiplatform
raise ffiplatform.VerificationMissing(self._get_c_name())
def build_backend_type(self, ffi, finishlist):
self.check_not_partial()
base_btype = self.build_baseinttype(ffi, finishlist)
return global_cache(self, ffi, 'new_enum_type',
self.get_official_name(),
self.enumerators, self.enumvalues,
base_btype, key=self)
def build_baseinttype(self, ffi, finishlist):
if self.baseinttype is not None:
return self.baseinttype.get_cached_btype(ffi, finishlist)
#
if self.enumvalues:
smallest_value = min(self.enumvalues)
largest_value = max(self.enumvalues)
else:
smallest_value = 0
largest_value = 0
if smallest_value < 0: # needs a signed type
sign = 1
candidate1 = PrimitiveType("int")
candidate2 = PrimitiveType("long")
else:
sign = 0
candidate1 = PrimitiveType("unsigned int")
candidate2 = PrimitiveType("unsigned long")
btype1 = candidate1.get_cached_btype(ffi, finishlist)
btype2 = candidate2.get_cached_btype(ffi, finishlist)
size1 = ffi.sizeof(btype1)
size2 = ffi.sizeof(btype2)
if (smallest_value >= ((-1) << (8*size1-1)) and
largest_value < (1 << (8*size1-sign))):
return btype1
if (smallest_value >= ((-1) << (8*size2-1)) and
largest_value < (1 << (8*size2-sign))):
return btype2
raise api.CDefError("%s values don't all fit into either 'long' "
"or 'unsigned long'" % self._get_c_name())
def unknown_type(name, structname=None):
if structname is None:
structname = '$%s' % name
tp = StructType(structname, None, None, None)
tp.force_the_name(name)
return tp
def unknown_ptr_type(name, structname=None):
if structname is None:
structname = '*$%s' % name
tp = StructType(structname, None, None, None)
return NamedPointerType(tp, name)
global_lock = allocate_lock()
def global_cache(srctype, ffi, funcname, *args, **kwds):
key = kwds.pop('key', (funcname, args))
assert not kwds
try:
return ffi._backend.__typecache[key]
except KeyError:
pass
except AttributeError:
# initialize the __typecache attribute, either at the module level
# if ffi._backend is a module, or at the class level if ffi._backend
# is some instance.
if isinstance(ffi._backend, types.ModuleType):
ffi._backend.__typecache = weakref.WeakValueDictionary()
else:
type(ffi._backend).__typecache = weakref.WeakValueDictionary()
try:
res = getattr(ffi._backend, funcname)(*args)
except NotImplementedError as e:
raise NotImplementedError("%r: %s" % (srctype, e))
# note that setdefault() on WeakValueDictionary is not atomic
# and contains a rare bug (http://bugs.python.org/issue19542);
# we have to use a lock and do it ourselves
cache = ffi._backend.__typecache
with global_lock:
res1 = cache.get(key)
if res1 is None:
cache[key] = res
return res
else:
return res1
def pointer_cache(ffi, BType):
return global_cache('?', ffi, 'new_pointer_type', BType)
def attach_exception_info(e, name):
if e.args and type(e.args[0]) is str:
e.args = ('%s: %s' % (name, e.args[0]),) + e.args[1:]

View file

@ -0,0 +1,951 @@
import sys, imp
from . import model, ffiplatform
class VCPythonEngine(object):
_class_key = 'x'
_gen_python_module = True
def __init__(self, verifier):
self.verifier = verifier
self.ffi = verifier.ffi
self._struct_pending_verification = {}
self._types_of_builtin_functions = {}
def patch_extension_kwds(self, kwds):
pass
def find_module(self, module_name, path, so_suffixes):
try:
f, filename, descr = imp.find_module(module_name, path)
except ImportError:
return None
if f is not None:
f.close()
# Note that after a setuptools installation, there are both .py
# and .so files with the same basename. The code here relies on
# imp.find_module() locating the .so in priority.
if descr[0] not in so_suffixes:
return None
return filename
def collect_types(self):
self._typesdict = {}
self._generate("collecttype")
def _prnt(self, what=''):
self._f.write(what + '\n')
def _gettypenum(self, type):
# a KeyError here is a bug. please report it! :-)
return self._typesdict[type]
def _do_collect_type(self, tp):
if ((not isinstance(tp, model.PrimitiveType)
or tp.name == 'long double')
and tp not in self._typesdict):
num = len(self._typesdict)
self._typesdict[tp] = num
def write_source_to_f(self):
self.collect_types()
#
# The new module will have a _cffi_setup() function that receives
# objects from the ffi world, and that calls some setup code in
# the module. This setup code is split in several independent
# functions, e.g. one per constant. The functions are "chained"
# by ending in a tail call to each other.
#
# This is further split in two chained lists, depending on if we
# can do it at import-time or if we must wait for _cffi_setup() to
# provide us with the <ctype> objects. This is needed because we
# need the values of the enum constants in order to build the
# <ctype 'enum'> that we may have to pass to _cffi_setup().
#
# The following two 'chained_list_constants' items contains
# the head of these two chained lists, as a string that gives the
# call to do, if any.
self._chained_list_constants = ['0', '0']
#
prnt = self._prnt
# first paste some standard set of lines that are mostly '#define'
prnt(cffimod_header)
prnt()
# then paste the C source given by the user, verbatim.
prnt(self.verifier.preamble)
prnt()
#
# call generate_cpy_xxx_decl(), for every xxx found from
# ffi._parser._declarations. This generates all the functions.
self._generate("decl")
#
# implement the function _cffi_setup_custom() as calling the
# head of the chained list.
self._generate_setup_custom()
prnt()
#
# produce the method table, including the entries for the
# generated Python->C function wrappers, which are done
# by generate_cpy_function_method().
prnt('static PyMethodDef _cffi_methods[] = {')
self._generate("method")
prnt(' {"_cffi_setup", _cffi_setup, METH_VARARGS, NULL},')
prnt(' {NULL, NULL, 0, NULL} /* Sentinel */')
prnt('};')
prnt()
#
# standard init.
modname = self.verifier.get_module_name()
constants = self._chained_list_constants[False]
prnt('#if PY_MAJOR_VERSION >= 3')
prnt()
prnt('static struct PyModuleDef _cffi_module_def = {')
prnt(' PyModuleDef_HEAD_INIT,')
prnt(' "%s",' % modname)
prnt(' NULL,')
prnt(' -1,')
prnt(' _cffi_methods,')
prnt(' NULL, NULL, NULL, NULL')
prnt('};')
prnt()
prnt('PyMODINIT_FUNC')
prnt('PyInit_%s(void)' % modname)
prnt('{')
prnt(' PyObject *lib;')
prnt(' lib = PyModule_Create(&_cffi_module_def);')
prnt(' if (lib == NULL)')
prnt(' return NULL;')
prnt(' if (%s < 0 || _cffi_init() < 0) {' % (constants,))
prnt(' Py_DECREF(lib);')
prnt(' return NULL;')
prnt(' }')
prnt(' return lib;')
prnt('}')
prnt()
prnt('#else')
prnt()
prnt('PyMODINIT_FUNC')
prnt('init%s(void)' % modname)
prnt('{')
prnt(' PyObject *lib;')
prnt(' lib = Py_InitModule("%s", _cffi_methods);' % modname)
prnt(' if (lib == NULL)')
prnt(' return;')
prnt(' if (%s < 0 || _cffi_init() < 0)' % (constants,))
prnt(' return;')
prnt(' return;')
prnt('}')
prnt()
prnt('#endif')
def load_library(self):
# XXX review all usages of 'self' here!
# import it as a new extension module
try:
module = imp.load_dynamic(self.verifier.get_module_name(),
self.verifier.modulefilename)
except ImportError as e:
error = "importing %r: %s" % (self.verifier.modulefilename, e)
raise ffiplatform.VerificationError(error)
#
# call loading_cpy_struct() to get the struct layout inferred by
# the C compiler
self._load(module, 'loading')
#
# the C code will need the <ctype> objects. Collect them in
# order in a list.
revmapping = dict([(value, key)
for (key, value) in self._typesdict.items()])
lst = [revmapping[i] for i in range(len(revmapping))]
lst = list(map(self.ffi._get_cached_btype, lst))
#
# build the FFILibrary class and instance and call _cffi_setup().
# this will set up some fields like '_cffi_types', and only then
# it will invoke the chained list of functions that will really
# build (notably) the constant objects, as <cdata> if they are
# pointers, and store them as attributes on the 'library' object.
class FFILibrary(object):
_cffi_python_module = module
_cffi_ffi = self.ffi
_cffi_dir = []
def __dir__(self):
return FFILibrary._cffi_dir + list(self.__dict__)
library = FFILibrary()
if module._cffi_setup(lst, ffiplatform.VerificationError, library):
import warnings
warnings.warn("reimporting %r might overwrite older definitions"
% (self.verifier.get_module_name()))
#
# finally, call the loaded_cpy_xxx() functions. This will perform
# the final adjustments, like copying the Python->C wrapper
# functions from the module to the 'library' object, and setting
# up the FFILibrary class with properties for the global C variables.
self._load(module, 'loaded', library=library)
module._cffi_original_ffi = self.ffi
module._cffi_types_of_builtin_funcs = self._types_of_builtin_functions
return library
def _get_declarations(self):
return sorted(self.ffi._parser._declarations.items())
def _generate(self, step_name):
for name, tp in self._get_declarations():
kind, realname = name.split(' ', 1)
try:
method = getattr(self, '_generate_cpy_%s_%s' % (kind,
step_name))
except AttributeError:
raise ffiplatform.VerificationError(
"not implemented in verify(): %r" % name)
try:
method(tp, realname)
except Exception as e:
model.attach_exception_info(e, name)
raise
def _load(self, module, step_name, **kwds):
for name, tp in self._get_declarations():
kind, realname = name.split(' ', 1)
method = getattr(self, '_%s_cpy_%s' % (step_name, kind))
try:
method(tp, realname, module, **kwds)
except Exception as e:
model.attach_exception_info(e, name)
raise
def _generate_nothing(self, tp, name):
pass
def _loaded_noop(self, tp, name, module, **kwds):
pass
# ----------
def _convert_funcarg_to_c(self, tp, fromvar, tovar, errcode):
extraarg = ''
if isinstance(tp, model.PrimitiveType):
if tp.is_integer_type() and tp.name != '_Bool':
converter = '_cffi_to_c_int'
extraarg = ', %s' % tp.name
else:
converter = '_cffi_to_c_%s' % (tp.name.replace(' ', '_'),)
errvalue = '-1'
#
elif isinstance(tp, model.PointerType):
self._convert_funcarg_to_c_ptr_or_array(tp, fromvar,
tovar, errcode)
return
#
elif isinstance(tp, (model.StructOrUnion, model.EnumType)):
# a struct (not a struct pointer) as a function argument
self._prnt(' if (_cffi_to_c((char *)&%s, _cffi_type(%d), %s) < 0)'
% (tovar, self._gettypenum(tp), fromvar))
self._prnt(' %s;' % errcode)
return
#
elif isinstance(tp, model.FunctionPtrType):
converter = '(%s)_cffi_to_c_pointer' % tp.get_c_name('')
extraarg = ', _cffi_type(%d)' % self._gettypenum(tp)
errvalue = 'NULL'
#
else:
raise NotImplementedError(tp)
#
self._prnt(' %s = %s(%s%s);' % (tovar, converter, fromvar, extraarg))
self._prnt(' if (%s == (%s)%s && PyErr_Occurred())' % (
tovar, tp.get_c_name(''), errvalue))
self._prnt(' %s;' % errcode)
def _extra_local_variables(self, tp, localvars):
if isinstance(tp, model.PointerType):
localvars.add('Py_ssize_t datasize')
def _convert_funcarg_to_c_ptr_or_array(self, tp, fromvar, tovar, errcode):
self._prnt(' datasize = _cffi_prepare_pointer_call_argument(')
self._prnt(' _cffi_type(%d), %s, (char **)&%s);' % (
self._gettypenum(tp), fromvar, tovar))
self._prnt(' if (datasize != 0) {')
self._prnt(' if (datasize < 0)')
self._prnt(' %s;' % errcode)
self._prnt(' %s = alloca(datasize);' % (tovar,))
self._prnt(' memset((void *)%s, 0, datasize);' % (tovar,))
self._prnt(' if (_cffi_convert_array_from_object('
'(char *)%s, _cffi_type(%d), %s) < 0)' % (
tovar, self._gettypenum(tp), fromvar))
self._prnt(' %s;' % errcode)
self._prnt(' }')
def _convert_expr_from_c(self, tp, var, context):
if isinstance(tp, model.PrimitiveType):
if tp.is_integer_type():
return '_cffi_from_c_int(%s, %s)' % (var, tp.name)
elif tp.name != 'long double':
return '_cffi_from_c_%s(%s)' % (tp.name.replace(' ', '_'), var)
else:
return '_cffi_from_c_deref((char *)&%s, _cffi_type(%d))' % (
var, self._gettypenum(tp))
elif isinstance(tp, (model.PointerType, model.FunctionPtrType)):
return '_cffi_from_c_pointer((char *)%s, _cffi_type(%d))' % (
var, self._gettypenum(tp))
elif isinstance(tp, model.ArrayType):
return '_cffi_from_c_pointer((char *)%s, _cffi_type(%d))' % (
var, self._gettypenum(model.PointerType(tp.item)))
elif isinstance(tp, model.StructType):
if tp.fldnames is None:
raise TypeError("'%s' is used as %s, but is opaque" % (
tp._get_c_name(), context))
return '_cffi_from_c_struct((char *)&%s, _cffi_type(%d))' % (
var, self._gettypenum(tp))
elif isinstance(tp, model.EnumType):
return '_cffi_from_c_deref((char *)&%s, _cffi_type(%d))' % (
var, self._gettypenum(tp))
else:
raise NotImplementedError(tp)
# ----------
# typedefs: generates no code so far
_generate_cpy_typedef_collecttype = _generate_nothing
_generate_cpy_typedef_decl = _generate_nothing
_generate_cpy_typedef_method = _generate_nothing
_loading_cpy_typedef = _loaded_noop
_loaded_cpy_typedef = _loaded_noop
# ----------
# function declarations
def _generate_cpy_function_collecttype(self, tp, name):
assert isinstance(tp, model.FunctionPtrType)
if tp.ellipsis:
self._do_collect_type(tp)
else:
# don't call _do_collect_type(tp) in this common case,
# otherwise test_autofilled_struct_as_argument fails
for type in tp.args:
self._do_collect_type(type)
self._do_collect_type(tp.result)
def _generate_cpy_function_decl(self, tp, name):
assert isinstance(tp, model.FunctionPtrType)
if tp.ellipsis:
# cannot support vararg functions better than this: check for its
# exact type (including the fixed arguments), and build it as a
# constant function pointer (no CPython wrapper)
self._generate_cpy_const(False, name, tp)
return
prnt = self._prnt
numargs = len(tp.args)
if numargs == 0:
argname = 'no_arg'
elif numargs == 1:
argname = 'arg0'
else:
argname = 'args'
prnt('static PyObject *')
prnt('_cffi_f_%s(PyObject *self, PyObject *%s)' % (name, argname))
prnt('{')
#
context = 'argument of %s' % name
for i, type in enumerate(tp.args):
prnt(' %s;' % type.get_c_name(' x%d' % i, context))
#
localvars = set()
for type in tp.args:
self._extra_local_variables(type, localvars)
for decl in localvars:
prnt(' %s;' % (decl,))
#
if not isinstance(tp.result, model.VoidType):
result_code = 'result = '
context = 'result of %s' % name
prnt(' %s;' % tp.result.get_c_name(' result', context))
else:
result_code = ''
#
if len(tp.args) > 1:
rng = range(len(tp.args))
for i in rng:
prnt(' PyObject *arg%d;' % i)
prnt()
prnt(' if (!PyArg_ParseTuple(args, "%s:%s", %s))' % (
'O' * numargs, name, ', '.join(['&arg%d' % i for i in rng])))
prnt(' return NULL;')
prnt()
#
for i, type in enumerate(tp.args):
self._convert_funcarg_to_c(type, 'arg%d' % i, 'x%d' % i,
'return NULL')
prnt()
#
prnt(' Py_BEGIN_ALLOW_THREADS')
prnt(' _cffi_restore_errno();')
prnt(' { %s%s(%s); }' % (
result_code, name,
', '.join(['x%d' % i for i in range(len(tp.args))])))
prnt(' _cffi_save_errno();')
prnt(' Py_END_ALLOW_THREADS')
prnt()
#
if result_code:
prnt(' return %s;' %
self._convert_expr_from_c(tp.result, 'result', 'result type'))
else:
prnt(' Py_INCREF(Py_None);')
prnt(' return Py_None;')
prnt('}')
prnt()
def _generate_cpy_function_method(self, tp, name):
if tp.ellipsis:
return
numargs = len(tp.args)
if numargs == 0:
meth = 'METH_NOARGS'
elif numargs == 1:
meth = 'METH_O'
else:
meth = 'METH_VARARGS'
self._prnt(' {"%s", _cffi_f_%s, %s, NULL},' % (name, name, meth))
_loading_cpy_function = _loaded_noop
def _loaded_cpy_function(self, tp, name, module, library):
if tp.ellipsis:
return
func = getattr(module, name)
setattr(library, name, func)
self._types_of_builtin_functions[func] = tp
# ----------
# named structs
_generate_cpy_struct_collecttype = _generate_nothing
def _generate_cpy_struct_decl(self, tp, name):
assert name == tp.name
self._generate_struct_or_union_decl(tp, 'struct', name)
def _generate_cpy_struct_method(self, tp, name):
self._generate_struct_or_union_method(tp, 'struct', name)
def _loading_cpy_struct(self, tp, name, module):
self._loading_struct_or_union(tp, 'struct', name, module)
def _loaded_cpy_struct(self, tp, name, module, **kwds):
self._loaded_struct_or_union(tp)
_generate_cpy_union_collecttype = _generate_nothing
def _generate_cpy_union_decl(self, tp, name):
assert name == tp.name
self._generate_struct_or_union_decl(tp, 'union', name)
def _generate_cpy_union_method(self, tp, name):
self._generate_struct_or_union_method(tp, 'union', name)
def _loading_cpy_union(self, tp, name, module):
self._loading_struct_or_union(tp, 'union', name, module)
def _loaded_cpy_union(self, tp, name, module, **kwds):
self._loaded_struct_or_union(tp)
def _generate_struct_or_union_decl(self, tp, prefix, name):
if tp.fldnames is None:
return # nothing to do with opaque structs
checkfuncname = '_cffi_check_%s_%s' % (prefix, name)
layoutfuncname = '_cffi_layout_%s_%s' % (prefix, name)
cname = ('%s %s' % (prefix, name)).strip()
#
prnt = self._prnt
prnt('static void %s(%s *p)' % (checkfuncname, cname))
prnt('{')
prnt(' /* only to generate compile-time warnings or errors */')
for fname, ftype, fbitsize in tp.enumfields():
if (isinstance(ftype, model.PrimitiveType)
and ftype.is_integer_type()) or fbitsize >= 0:
# accept all integers, but complain on float or double
prnt(' (void)((p->%s) << 1);' % fname)
else:
# only accept exactly the type declared.
try:
prnt(' { %s = &p->%s; (void)tmp; }' % (
ftype.get_c_name('*tmp', 'field %r'%fname), fname))
except ffiplatform.VerificationError as e:
prnt(' /* %s */' % str(e)) # cannot verify it, ignore
prnt('}')
prnt('static PyObject *')
prnt('%s(PyObject *self, PyObject *noarg)' % (layoutfuncname,))
prnt('{')
prnt(' struct _cffi_aligncheck { char x; %s y; };' % cname)
prnt(' static Py_ssize_t nums[] = {')
prnt(' sizeof(%s),' % cname)
prnt(' offsetof(struct _cffi_aligncheck, y),')
for fname, ftype, fbitsize in tp.enumfields():
if fbitsize >= 0:
continue # xxx ignore fbitsize for now
prnt(' offsetof(%s, %s),' % (cname, fname))
if isinstance(ftype, model.ArrayType) and ftype.length is None:
prnt(' 0, /* %s */' % ftype._get_c_name())
else:
prnt(' sizeof(((%s *)0)->%s),' % (cname, fname))
prnt(' -1')
prnt(' };')
prnt(' return _cffi_get_struct_layout(nums);')
prnt(' /* the next line is not executed, but compiled */')
prnt(' %s(0);' % (checkfuncname,))
prnt('}')
prnt()
def _generate_struct_or_union_method(self, tp, prefix, name):
if tp.fldnames is None:
return # nothing to do with opaque structs
layoutfuncname = '_cffi_layout_%s_%s' % (prefix, name)
self._prnt(' {"%s", %s, METH_NOARGS, NULL},' % (layoutfuncname,
layoutfuncname))
def _loading_struct_or_union(self, tp, prefix, name, module):
if tp.fldnames is None:
return # nothing to do with opaque structs
layoutfuncname = '_cffi_layout_%s_%s' % (prefix, name)
#
function = getattr(module, layoutfuncname)
layout = function()
if isinstance(tp, model.StructOrUnion) and tp.partial:
# use the function()'s sizes and offsets to guide the
# layout of the struct
totalsize = layout[0]
totalalignment = layout[1]
fieldofs = layout[2::2]
fieldsize = layout[3::2]
tp.force_flatten()
assert len(fieldofs) == len(fieldsize) == len(tp.fldnames)
tp.fixedlayout = fieldofs, fieldsize, totalsize, totalalignment
else:
cname = ('%s %s' % (prefix, name)).strip()
self._struct_pending_verification[tp] = layout, cname
def _loaded_struct_or_union(self, tp):
if tp.fldnames is None:
return # nothing to do with opaque structs
self.ffi._get_cached_btype(tp) # force 'fixedlayout' to be considered
if tp in self._struct_pending_verification:
# check that the layout sizes and offsets match the real ones
def check(realvalue, expectedvalue, msg):
if realvalue != expectedvalue:
raise ffiplatform.VerificationError(
"%s (we have %d, but C compiler says %d)"
% (msg, expectedvalue, realvalue))
ffi = self.ffi
BStruct = ffi._get_cached_btype(tp)
layout, cname = self._struct_pending_verification.pop(tp)
check(layout[0], ffi.sizeof(BStruct), "wrong total size")
check(layout[1], ffi.alignof(BStruct), "wrong total alignment")
i = 2
for fname, ftype, fbitsize in tp.enumfields():
if fbitsize >= 0:
continue # xxx ignore fbitsize for now
check(layout[i], ffi.offsetof(BStruct, fname),
"wrong offset for field %r" % (fname,))
if layout[i+1] != 0:
BField = ffi._get_cached_btype(ftype)
check(layout[i+1], ffi.sizeof(BField),
"wrong size for field %r" % (fname,))
i += 2
assert i == len(layout)
# ----------
# 'anonymous' declarations. These are produced for anonymous structs
# or unions; the 'name' is obtained by a typedef.
_generate_cpy_anonymous_collecttype = _generate_nothing
def _generate_cpy_anonymous_decl(self, tp, name):
if isinstance(tp, model.EnumType):
self._generate_cpy_enum_decl(tp, name, '')
else:
self._generate_struct_or_union_decl(tp, '', name)
def _generate_cpy_anonymous_method(self, tp, name):
if not isinstance(tp, model.EnumType):
self._generate_struct_or_union_method(tp, '', name)
def _loading_cpy_anonymous(self, tp, name, module):
if isinstance(tp, model.EnumType):
self._loading_cpy_enum(tp, name, module)
else:
self._loading_struct_or_union(tp, '', name, module)
def _loaded_cpy_anonymous(self, tp, name, module, **kwds):
if isinstance(tp, model.EnumType):
self._loaded_cpy_enum(tp, name, module, **kwds)
else:
self._loaded_struct_or_union(tp)
# ----------
# constants, likely declared with '#define'
def _generate_cpy_const(self, is_int, name, tp=None, category='const',
vartp=None, delayed=True, size_too=False):
prnt = self._prnt
funcname = '_cffi_%s_%s' % (category, name)
prnt('static int %s(PyObject *lib)' % funcname)
prnt('{')
prnt(' PyObject *o;')
prnt(' int res;')
if not is_int:
prnt(' %s;' % (vartp or tp).get_c_name(' i', name))
else:
assert category == 'const'
#
if not is_int:
if category == 'var':
realexpr = '&' + name
else:
realexpr = name
prnt(' i = (%s);' % (realexpr,))
prnt(' o = %s;' % (self._convert_expr_from_c(tp, 'i',
'variable type'),))
assert delayed
else:
prnt(' o = _cffi_from_c_int_const(%s);' % name)
prnt(' if (o == NULL)')
prnt(' return -1;')
if size_too:
prnt(' {')
prnt(' PyObject *o1 = o;')
prnt(' o = Py_BuildValue("On", o1, (Py_ssize_t)sizeof(%s));'
% (name,))
prnt(' Py_DECREF(o1);')
prnt(' if (o == NULL)')
prnt(' return -1;')
prnt(' }')
prnt(' res = PyObject_SetAttrString(lib, "%s", o);' % name)
prnt(' Py_DECREF(o);')
prnt(' if (res < 0)')
prnt(' return -1;')
prnt(' return %s;' % self._chained_list_constants[delayed])
self._chained_list_constants[delayed] = funcname + '(lib)'
prnt('}')
prnt()
def _generate_cpy_constant_collecttype(self, tp, name):
is_int = isinstance(tp, model.PrimitiveType) and tp.is_integer_type()
if not is_int:
self._do_collect_type(tp)
def _generate_cpy_constant_decl(self, tp, name):
is_int = isinstance(tp, model.PrimitiveType) and tp.is_integer_type()
self._generate_cpy_const(is_int, name, tp)
_generate_cpy_constant_method = _generate_nothing
_loading_cpy_constant = _loaded_noop
_loaded_cpy_constant = _loaded_noop
# ----------
# enums
def _enum_funcname(self, prefix, name):
# "$enum_$1" => "___D_enum____D_1"
name = name.replace('$', '___D_')
return '_cffi_e_%s_%s' % (prefix, name)
def _generate_cpy_enum_decl(self, tp, name, prefix='enum'):
if tp.partial:
for enumerator in tp.enumerators:
self._generate_cpy_const(True, enumerator, delayed=False)
return
#
funcname = self._enum_funcname(prefix, name)
prnt = self._prnt
prnt('static int %s(PyObject *lib)' % funcname)
prnt('{')
for enumerator, enumvalue in zip(tp.enumerators, tp.enumvalues):
if enumvalue < 0:
prnt(' if ((%s) >= 0 || (long)(%s) != %dL) {' % (
enumerator, enumerator, enumvalue))
else:
prnt(' if ((%s) < 0 || (unsigned long)(%s) != %dUL) {' % (
enumerator, enumerator, enumvalue))
prnt(' char buf[64];')
prnt(' if ((%s) < 0)' % enumerator)
prnt(' snprintf(buf, 63, "%%ld", (long)(%s));' % enumerator)
prnt(' else')
prnt(' snprintf(buf, 63, "%%lu", (unsigned long)(%s));' %
enumerator)
prnt(' PyErr_Format(_cffi_VerificationError,')
prnt(' "enum %s: %s has the real value %s, '
'not %s",')
prnt(' "%s", "%s", buf, "%d");' % (
name, enumerator, enumvalue))
prnt(' return -1;')
prnt(' }')
prnt(' return %s;' % self._chained_list_constants[True])
self._chained_list_constants[True] = funcname + '(lib)'
prnt('}')
prnt()
_generate_cpy_enum_collecttype = _generate_nothing
_generate_cpy_enum_method = _generate_nothing
def _loading_cpy_enum(self, tp, name, module):
if tp.partial:
enumvalues = [getattr(module, enumerator)
for enumerator in tp.enumerators]
tp.enumvalues = tuple(enumvalues)
tp.partial_resolved = True
def _loaded_cpy_enum(self, tp, name, module, library):
for enumerator, enumvalue in zip(tp.enumerators, tp.enumvalues):
setattr(library, enumerator, enumvalue)
# ----------
# macros: for now only for integers
def _generate_cpy_macro_decl(self, tp, name):
assert tp == '...'
self._generate_cpy_const(True, name)
_generate_cpy_macro_collecttype = _generate_nothing
_generate_cpy_macro_method = _generate_nothing
_loading_cpy_macro = _loaded_noop
_loaded_cpy_macro = _loaded_noop
# ----------
# global variables
def _generate_cpy_variable_collecttype(self, tp, name):
if isinstance(tp, model.ArrayType):
tp_ptr = model.PointerType(tp.item)
else:
tp_ptr = model.PointerType(tp)
self._do_collect_type(tp_ptr)
def _generate_cpy_variable_decl(self, tp, name):
if isinstance(tp, model.ArrayType):
tp_ptr = model.PointerType(tp.item)
self._generate_cpy_const(False, name, tp, vartp=tp_ptr,
size_too = (tp.length == '...'))
else:
tp_ptr = model.PointerType(tp)
self._generate_cpy_const(False, name, tp_ptr, category='var')
_generate_cpy_variable_method = _generate_nothing
_loading_cpy_variable = _loaded_noop
def _loaded_cpy_variable(self, tp, name, module, library):
value = getattr(library, name)
if isinstance(tp, model.ArrayType): # int a[5] is "constant" in the
# sense that "a=..." is forbidden
if tp.length == '...':
assert isinstance(value, tuple)
(value, size) = value
BItemType = self.ffi._get_cached_btype(tp.item)
length, rest = divmod(size, self.ffi.sizeof(BItemType))
if rest != 0:
raise ffiplatform.VerificationError(
"bad size: %r does not seem to be an array of %s" %
(name, tp.item))
tp = tp.resolve_length(length)
# 'value' is a <cdata 'type *'> which we have to replace with
# a <cdata 'type[N]'> if the N is actually known
if tp.length is not None:
BArray = self.ffi._get_cached_btype(tp)
value = self.ffi.cast(BArray, value)
setattr(library, name, value)
return
# remove ptr=<cdata 'int *'> from the library instance, and replace
# it by a property on the class, which reads/writes into ptr[0].
ptr = value
delattr(library, name)
def getter(library):
return ptr[0]
def setter(library, value):
ptr[0] = value
setattr(type(library), name, property(getter, setter))
type(library)._cffi_dir.append(name)
# ----------
def _generate_setup_custom(self):
prnt = self._prnt
prnt('static int _cffi_setup_custom(PyObject *lib)')
prnt('{')
prnt(' return %s;' % self._chained_list_constants[True])
prnt('}')
cffimod_header = r'''
#include <Python.h>
#include <stddef.h>
/* this block of #ifs should be kept exactly identical between
c/_cffi_backend.c, cffi/vengine_cpy.py, cffi/vengine_gen.py */
#if defined(_MSC_VER)
# include <malloc.h> /* for alloca() */
# if _MSC_VER < 1600 /* MSVC < 2010 */
typedef __int8 int8_t;
typedef __int16 int16_t;
typedef __int32 int32_t;
typedef __int64 int64_t;
typedef unsigned __int8 uint8_t;
typedef unsigned __int16 uint16_t;
typedef unsigned __int32 uint32_t;
typedef unsigned __int64 uint64_t;
# else
# include <stdint.h>
# endif
# if _MSC_VER < 1800 /* MSVC < 2013 */
typedef unsigned char _Bool;
# endif
#else
# include <stdint.h>
# if (defined (__SVR4) && defined (__sun)) || defined(_AIX)
# include <alloca.h>
# endif
#endif
#if PY_MAJOR_VERSION < 3
# undef PyCapsule_CheckExact
# undef PyCapsule_GetPointer
# define PyCapsule_CheckExact(capsule) (PyCObject_Check(capsule))
# define PyCapsule_GetPointer(capsule, name) \
(PyCObject_AsVoidPtr(capsule))
#endif
#if PY_MAJOR_VERSION >= 3
# define PyInt_FromLong PyLong_FromLong
#endif
#define _cffi_from_c_double PyFloat_FromDouble
#define _cffi_from_c_float PyFloat_FromDouble
#define _cffi_from_c_long PyInt_FromLong
#define _cffi_from_c_ulong PyLong_FromUnsignedLong
#define _cffi_from_c_longlong PyLong_FromLongLong
#define _cffi_from_c_ulonglong PyLong_FromUnsignedLongLong
#define _cffi_to_c_double PyFloat_AsDouble
#define _cffi_to_c_float PyFloat_AsDouble
#define _cffi_from_c_int_const(x) \
(((x) > 0) ? \
((unsigned long long)(x) <= (unsigned long long)LONG_MAX) ? \
PyInt_FromLong((long)(x)) : \
PyLong_FromUnsignedLongLong((unsigned long long)(x)) : \
((long long)(x) >= (long long)LONG_MIN) ? \
PyInt_FromLong((long)(x)) : \
PyLong_FromLongLong((long long)(x)))
#define _cffi_from_c_int(x, type) \
(((type)-1) > 0 ? /* unsigned */ \
(sizeof(type) < sizeof(long) ? PyInt_FromLong(x) : \
sizeof(type) == sizeof(long) ? PyLong_FromUnsignedLong(x) : \
PyLong_FromUnsignedLongLong(x)) \
: (sizeof(type) <= sizeof(long) ? PyInt_FromLong(x) : \
PyLong_FromLongLong(x)))
#define _cffi_to_c_int(o, type) \
(sizeof(type) == 1 ? (((type)-1) > 0 ? (type)_cffi_to_c_u8(o) \
: (type)_cffi_to_c_i8(o)) : \
sizeof(type) == 2 ? (((type)-1) > 0 ? (type)_cffi_to_c_u16(o) \
: (type)_cffi_to_c_i16(o)) : \
sizeof(type) == 4 ? (((type)-1) > 0 ? (type)_cffi_to_c_u32(o) \
: (type)_cffi_to_c_i32(o)) : \
sizeof(type) == 8 ? (((type)-1) > 0 ? (type)_cffi_to_c_u64(o) \
: (type)_cffi_to_c_i64(o)) : \
(Py_FatalError("unsupported size for type " #type), 0))
#define _cffi_to_c_i8 \
((int(*)(PyObject *))_cffi_exports[1])
#define _cffi_to_c_u8 \
((int(*)(PyObject *))_cffi_exports[2])
#define _cffi_to_c_i16 \
((int(*)(PyObject *))_cffi_exports[3])
#define _cffi_to_c_u16 \
((int(*)(PyObject *))_cffi_exports[4])
#define _cffi_to_c_i32 \
((int(*)(PyObject *))_cffi_exports[5])
#define _cffi_to_c_u32 \
((unsigned int(*)(PyObject *))_cffi_exports[6])
#define _cffi_to_c_i64 \
((long long(*)(PyObject *))_cffi_exports[7])
#define _cffi_to_c_u64 \
((unsigned long long(*)(PyObject *))_cffi_exports[8])
#define _cffi_to_c_char \
((int(*)(PyObject *))_cffi_exports[9])
#define _cffi_from_c_pointer \
((PyObject *(*)(char *, CTypeDescrObject *))_cffi_exports[10])
#define _cffi_to_c_pointer \
((char *(*)(PyObject *, CTypeDescrObject *))_cffi_exports[11])
#define _cffi_get_struct_layout \
((PyObject *(*)(Py_ssize_t[]))_cffi_exports[12])
#define _cffi_restore_errno \
((void(*)(void))_cffi_exports[13])
#define _cffi_save_errno \
((void(*)(void))_cffi_exports[14])
#define _cffi_from_c_char \
((PyObject *(*)(char))_cffi_exports[15])
#define _cffi_from_c_deref \
((PyObject *(*)(char *, CTypeDescrObject *))_cffi_exports[16])
#define _cffi_to_c \
((int(*)(char *, CTypeDescrObject *, PyObject *))_cffi_exports[17])
#define _cffi_from_c_struct \
((PyObject *(*)(char *, CTypeDescrObject *))_cffi_exports[18])
#define _cffi_to_c_wchar_t \
((wchar_t(*)(PyObject *))_cffi_exports[19])
#define _cffi_from_c_wchar_t \
((PyObject *(*)(wchar_t))_cffi_exports[20])
#define _cffi_to_c_long_double \
((long double(*)(PyObject *))_cffi_exports[21])
#define _cffi_to_c__Bool \
((_Bool(*)(PyObject *))_cffi_exports[22])
#define _cffi_prepare_pointer_call_argument \
((Py_ssize_t(*)(CTypeDescrObject *, PyObject *, char **))_cffi_exports[23])
#define _cffi_convert_array_from_object \
((int(*)(char *, CTypeDescrObject *, PyObject *))_cffi_exports[24])
#define _CFFI_NUM_EXPORTS 25
typedef struct _ctypedescr CTypeDescrObject;
static void *_cffi_exports[_CFFI_NUM_EXPORTS];
static PyObject *_cffi_types, *_cffi_VerificationError;
static int _cffi_setup_custom(PyObject *lib); /* forward */
static PyObject *_cffi_setup(PyObject *self, PyObject *args)
{
PyObject *library;
int was_alive = (_cffi_types != NULL);
if (!PyArg_ParseTuple(args, "OOO", &_cffi_types, &_cffi_VerificationError,
&library))
return NULL;
Py_INCREF(_cffi_types);
Py_INCREF(_cffi_VerificationError);
if (_cffi_setup_custom(library) < 0)
return NULL;
return PyBool_FromLong(was_alive);
}
static int _cffi_init(void)
{
PyObject *module, *c_api_object = NULL;
module = PyImport_ImportModule("_cffi_backend");
if (module == NULL)
goto failure;
c_api_object = PyObject_GetAttrString(module, "_C_API");
if (c_api_object == NULL)
goto failure;
if (!PyCapsule_CheckExact(c_api_object)) {
PyErr_SetNone(PyExc_ImportError);
goto failure;
}
memcpy(_cffi_exports, PyCapsule_GetPointer(c_api_object, "cffi"),
_CFFI_NUM_EXPORTS * sizeof(void *));
Py_DECREF(module);
Py_DECREF(c_api_object);
return 0;
failure:
Py_XDECREF(module);
Py_XDECREF(c_api_object);
return -1;
}
#define _cffi_type(num) ((CTypeDescrObject *)PyList_GET_ITEM(_cffi_types, num))
/**********/
'''

View file

@ -0,0 +1,580 @@
import sys, os
import types
from . import model, ffiplatform
class VGenericEngine(object):
_class_key = 'g'
_gen_python_module = False
def __init__(self, verifier):
self.verifier = verifier
self.ffi = verifier.ffi
self.export_symbols = []
self._struct_pending_verification = {}
def patch_extension_kwds(self, kwds):
# add 'export_symbols' to the dictionary. Note that we add the
# list before filling it. When we fill it, it will thus also show
# up in kwds['export_symbols'].
kwds.setdefault('export_symbols', self.export_symbols)
def find_module(self, module_name, path, so_suffixes):
for so_suffix in so_suffixes:
basename = module_name + so_suffix
if path is None:
path = sys.path
for dirname in path:
filename = os.path.join(dirname, basename)
if os.path.isfile(filename):
return filename
def collect_types(self):
pass # not needed in the generic engine
def _prnt(self, what=''):
self._f.write(what + '\n')
def write_source_to_f(self):
prnt = self._prnt
# first paste some standard set of lines that are mostly '#include'
prnt(cffimod_header)
# then paste the C source given by the user, verbatim.
prnt(self.verifier.preamble)
#
# call generate_gen_xxx_decl(), for every xxx found from
# ffi._parser._declarations. This generates all the functions.
self._generate('decl')
#
# on Windows, distutils insists on putting init_cffi_xyz in
# 'export_symbols', so instead of fighting it, just give up and
# give it one
if sys.platform == 'win32':
if sys.version_info >= (3,):
prefix = 'PyInit_'
else:
prefix = 'init'
modname = self.verifier.get_module_name()
prnt("void %s%s(void) { }\n" % (prefix, modname))
def load_library(self):
# import it with the CFFI backend
backend = self.ffi._backend
# needs to make a path that contains '/', on Posix
filename = os.path.join(os.curdir, self.verifier.modulefilename)
module = backend.load_library(filename)
#
# call loading_gen_struct() to get the struct layout inferred by
# the C compiler
self._load(module, 'loading')
# build the FFILibrary class and instance, this is a module subclass
# because modules are expected to have usually-constant-attributes and
# in PyPy this means the JIT is able to treat attributes as constant,
# which we want.
class FFILibrary(types.ModuleType):
_cffi_generic_module = module
_cffi_ffi = self.ffi
_cffi_dir = []
def __dir__(self):
return FFILibrary._cffi_dir
library = FFILibrary("")
#
# finally, call the loaded_gen_xxx() functions. This will set
# up the 'library' object.
self._load(module, 'loaded', library=library)
return library
def _get_declarations(self):
return sorted(self.ffi._parser._declarations.items())
def _generate(self, step_name):
for name, tp in self._get_declarations():
kind, realname = name.split(' ', 1)
try:
method = getattr(self, '_generate_gen_%s_%s' % (kind,
step_name))
except AttributeError:
raise ffiplatform.VerificationError(
"not implemented in verify(): %r" % name)
try:
method(tp, realname)
except Exception as e:
model.attach_exception_info(e, name)
raise
def _load(self, module, step_name, **kwds):
for name, tp in self._get_declarations():
kind, realname = name.split(' ', 1)
method = getattr(self, '_%s_gen_%s' % (step_name, kind))
try:
method(tp, realname, module, **kwds)
except Exception as e:
model.attach_exception_info(e, name)
raise
def _generate_nothing(self, tp, name):
pass
def _loaded_noop(self, tp, name, module, **kwds):
pass
# ----------
# typedefs: generates no code so far
_generate_gen_typedef_decl = _generate_nothing
_loading_gen_typedef = _loaded_noop
_loaded_gen_typedef = _loaded_noop
# ----------
# function declarations
def _generate_gen_function_decl(self, tp, name):
assert isinstance(tp, model.FunctionPtrType)
if tp.ellipsis:
# cannot support vararg functions better than this: check for its
# exact type (including the fixed arguments), and build it as a
# constant function pointer (no _cffi_f_%s wrapper)
self._generate_gen_const(False, name, tp)
return
prnt = self._prnt
numargs = len(tp.args)
argnames = []
for i, type in enumerate(tp.args):
indirection = ''
if isinstance(type, model.StructOrUnion):
indirection = '*'
argnames.append('%sx%d' % (indirection, i))
context = 'argument of %s' % name
arglist = [type.get_c_name(' %s' % arg, context)
for type, arg in zip(tp.args, argnames)]
arglist = ', '.join(arglist) or 'void'
wrappername = '_cffi_f_%s' % name
self.export_symbols.append(wrappername)
funcdecl = ' %s(%s)' % (wrappername, arglist)
context = 'result of %s' % name
prnt(tp.result.get_c_name(funcdecl, context))
prnt('{')
#
if not isinstance(tp.result, model.VoidType):
result_code = 'return '
else:
result_code = ''
prnt(' %s%s(%s);' % (result_code, name, ', '.join(argnames)))
prnt('}')
prnt()
_loading_gen_function = _loaded_noop
def _loaded_gen_function(self, tp, name, module, library):
assert isinstance(tp, model.FunctionPtrType)
if tp.ellipsis:
newfunction = self._load_constant(False, tp, name, module)
else:
indirections = []
base_tp = tp
if any(isinstance(typ, model.StructOrUnion) for typ in tp.args):
indirect_args = []
for i, typ in enumerate(tp.args):
if isinstance(typ, model.StructOrUnion):
typ = model.PointerType(typ)
indirections.append((i, typ))
indirect_args.append(typ)
tp = model.FunctionPtrType(tuple(indirect_args),
tp.result, tp.ellipsis)
BFunc = self.ffi._get_cached_btype(tp)
wrappername = '_cffi_f_%s' % name
newfunction = module.load_function(BFunc, wrappername)
for i, typ in indirections:
newfunction = self._make_struct_wrapper(newfunction, i, typ,
base_tp)
setattr(library, name, newfunction)
type(library)._cffi_dir.append(name)
def _make_struct_wrapper(self, oldfunc, i, tp, base_tp):
backend = self.ffi._backend
BType = self.ffi._get_cached_btype(tp)
def newfunc(*args):
args = args[:i] + (backend.newp(BType, args[i]),) + args[i+1:]
return oldfunc(*args)
newfunc._cffi_base_type = base_tp
return newfunc
# ----------
# named structs
def _generate_gen_struct_decl(self, tp, name):
assert name == tp.name
self._generate_struct_or_union_decl(tp, 'struct', name)
def _loading_gen_struct(self, tp, name, module):
self._loading_struct_or_union(tp, 'struct', name, module)
def _loaded_gen_struct(self, tp, name, module, **kwds):
self._loaded_struct_or_union(tp)
def _generate_gen_union_decl(self, tp, name):
assert name == tp.name
self._generate_struct_or_union_decl(tp, 'union', name)
def _loading_gen_union(self, tp, name, module):
self._loading_struct_or_union(tp, 'union', name, module)
def _loaded_gen_union(self, tp, name, module, **kwds):
self._loaded_struct_or_union(tp)
def _generate_struct_or_union_decl(self, tp, prefix, name):
if tp.fldnames is None:
return # nothing to do with opaque structs
checkfuncname = '_cffi_check_%s_%s' % (prefix, name)
layoutfuncname = '_cffi_layout_%s_%s' % (prefix, name)
cname = ('%s %s' % (prefix, name)).strip()
#
prnt = self._prnt
prnt('static void %s(%s *p)' % (checkfuncname, cname))
prnt('{')
prnt(' /* only to generate compile-time warnings or errors */')
for fname, ftype, fbitsize in tp.enumfields():
if (isinstance(ftype, model.PrimitiveType)
and ftype.is_integer_type()) or fbitsize >= 0:
# accept all integers, but complain on float or double
prnt(' (void)((p->%s) << 1);' % fname)
else:
# only accept exactly the type declared.
try:
prnt(' { %s = &p->%s; (void)tmp; }' % (
ftype.get_c_name('*tmp', 'field %r'%fname), fname))
except ffiplatform.VerificationError as e:
prnt(' /* %s */' % str(e)) # cannot verify it, ignore
prnt('}')
self.export_symbols.append(layoutfuncname)
prnt('intptr_t %s(intptr_t i)' % (layoutfuncname,))
prnt('{')
prnt(' struct _cffi_aligncheck { char x; %s y; };' % cname)
prnt(' static intptr_t nums[] = {')
prnt(' sizeof(%s),' % cname)
prnt(' offsetof(struct _cffi_aligncheck, y),')
for fname, ftype, fbitsize in tp.enumfields():
if fbitsize >= 0:
continue # xxx ignore fbitsize for now
prnt(' offsetof(%s, %s),' % (cname, fname))
if isinstance(ftype, model.ArrayType) and ftype.length is None:
prnt(' 0, /* %s */' % ftype._get_c_name())
else:
prnt(' sizeof(((%s *)0)->%s),' % (cname, fname))
prnt(' -1')
prnt(' };')
prnt(' return nums[i];')
prnt(' /* the next line is not executed, but compiled */')
prnt(' %s(0);' % (checkfuncname,))
prnt('}')
prnt()
def _loading_struct_or_union(self, tp, prefix, name, module):
if tp.fldnames is None:
return # nothing to do with opaque structs
layoutfuncname = '_cffi_layout_%s_%s' % (prefix, name)
#
BFunc = self.ffi._typeof_locked("intptr_t(*)(intptr_t)")[0]
function = module.load_function(BFunc, layoutfuncname)
layout = []
num = 0
while True:
x = function(num)
if x < 0: break
layout.append(x)
num += 1
if isinstance(tp, model.StructOrUnion) and tp.partial:
# use the function()'s sizes and offsets to guide the
# layout of the struct
totalsize = layout[0]
totalalignment = layout[1]
fieldofs = layout[2::2]
fieldsize = layout[3::2]
tp.force_flatten()
assert len(fieldofs) == len(fieldsize) == len(tp.fldnames)
tp.fixedlayout = fieldofs, fieldsize, totalsize, totalalignment
else:
cname = ('%s %s' % (prefix, name)).strip()
self._struct_pending_verification[tp] = layout, cname
def _loaded_struct_or_union(self, tp):
if tp.fldnames is None:
return # nothing to do with opaque structs
self.ffi._get_cached_btype(tp) # force 'fixedlayout' to be considered
if tp in self._struct_pending_verification:
# check that the layout sizes and offsets match the real ones
def check(realvalue, expectedvalue, msg):
if realvalue != expectedvalue:
raise ffiplatform.VerificationError(
"%s (we have %d, but C compiler says %d)"
% (msg, expectedvalue, realvalue))
ffi = self.ffi
BStruct = ffi._get_cached_btype(tp)
layout, cname = self._struct_pending_verification.pop(tp)
check(layout[0], ffi.sizeof(BStruct), "wrong total size")
check(layout[1], ffi.alignof(BStruct), "wrong total alignment")
i = 2
for fname, ftype, fbitsize in tp.enumfields():
if fbitsize >= 0:
continue # xxx ignore fbitsize for now
check(layout[i], ffi.offsetof(BStruct, fname),
"wrong offset for field %r" % (fname,))
if layout[i+1] != 0:
BField = ffi._get_cached_btype(ftype)
check(layout[i+1], ffi.sizeof(BField),
"wrong size for field %r" % (fname,))
i += 2
assert i == len(layout)
# ----------
# 'anonymous' declarations. These are produced for anonymous structs
# or unions; the 'name' is obtained by a typedef.
def _generate_gen_anonymous_decl(self, tp, name):
if isinstance(tp, model.EnumType):
self._generate_gen_enum_decl(tp, name, '')
else:
self._generate_struct_or_union_decl(tp, '', name)
def _loading_gen_anonymous(self, tp, name, module):
if isinstance(tp, model.EnumType):
self._loading_gen_enum(tp, name, module, '')
else:
self._loading_struct_or_union(tp, '', name, module)
def _loaded_gen_anonymous(self, tp, name, module, **kwds):
if isinstance(tp, model.EnumType):
self._loaded_gen_enum(tp, name, module, **kwds)
else:
self._loaded_struct_or_union(tp)
# ----------
# constants, likely declared with '#define'
def _generate_gen_const(self, is_int, name, tp=None, category='const'):
prnt = self._prnt
funcname = '_cffi_%s_%s' % (category, name)
self.export_symbols.append(funcname)
if is_int:
assert category == 'const'
prnt('int %s(long long *out_value)' % funcname)
prnt('{')
prnt(' *out_value = (long long)(%s);' % (name,))
prnt(' return (%s) <= 0;' % (name,))
prnt('}')
else:
assert tp is not None
prnt(tp.get_c_name(' %s(void)' % funcname, name),)
prnt('{')
if category == 'var':
ampersand = '&'
else:
ampersand = ''
prnt(' return (%s%s);' % (ampersand, name))
prnt('}')
prnt()
def _generate_gen_constant_decl(self, tp, name):
is_int = isinstance(tp, model.PrimitiveType) and tp.is_integer_type()
self._generate_gen_const(is_int, name, tp)
_loading_gen_constant = _loaded_noop
def _load_constant(self, is_int, tp, name, module):
funcname = '_cffi_const_%s' % name
if is_int:
BType = self.ffi._typeof_locked("long long*")[0]
BFunc = self.ffi._typeof_locked("int(*)(long long*)")[0]
function = module.load_function(BFunc, funcname)
p = self.ffi.new(BType)
negative = function(p)
value = int(p[0])
if value < 0 and not negative:
BLongLong = self.ffi._typeof_locked("long long")[0]
value += (1 << (8*self.ffi.sizeof(BLongLong)))
else:
BFunc = self.ffi._typeof_locked(tp.get_c_name('(*)(void)', name))[0]
function = module.load_function(BFunc, funcname)
value = function()
return value
def _loaded_gen_constant(self, tp, name, module, library):
is_int = isinstance(tp, model.PrimitiveType) and tp.is_integer_type()
value = self._load_constant(is_int, tp, name, module)
setattr(library, name, value)
type(library)._cffi_dir.append(name)
# ----------
# enums
def _enum_funcname(self, prefix, name):
# "$enum_$1" => "___D_enum____D_1"
name = name.replace('$', '___D_')
return '_cffi_e_%s_%s' % (prefix, name)
def _generate_gen_enum_decl(self, tp, name, prefix='enum'):
if tp.partial:
for enumerator in tp.enumerators:
self._generate_gen_const(True, enumerator)
return
#
funcname = self._enum_funcname(prefix, name)
self.export_symbols.append(funcname)
prnt = self._prnt
prnt('int %s(char *out_error)' % funcname)
prnt('{')
for enumerator, enumvalue in zip(tp.enumerators, tp.enumvalues):
if enumvalue < 0:
prnt(' if ((%s) >= 0 || (long)(%s) != %dL) {' % (
enumerator, enumerator, enumvalue))
else:
prnt(' if ((%s) < 0 || (unsigned long)(%s) != %dUL) {' % (
enumerator, enumerator, enumvalue))
prnt(' char buf[64];')
prnt(' if ((%s) < 0)' % enumerator)
prnt(' sprintf(buf, "%%ld", (long)(%s));' % enumerator)
prnt(' else')
prnt(' sprintf(buf, "%%lu", (unsigned long)(%s));' %
enumerator)
prnt(' sprintf(out_error,'
' "%s has the real value %s, not %s",')
prnt(' "%s", buf, "%d");' % (
enumerator[:100], enumvalue))
prnt(' return -1;')
prnt(' }')
prnt(' return 0;')
prnt('}')
prnt()
def _loading_gen_enum(self, tp, name, module, prefix='enum'):
if tp.partial:
enumvalues = [self._load_constant(True, tp, enumerator, module)
for enumerator in tp.enumerators]
tp.enumvalues = tuple(enumvalues)
tp.partial_resolved = True
else:
BType = self.ffi._typeof_locked("char[]")[0]
BFunc = self.ffi._typeof_locked("int(*)(char*)")[0]
funcname = self._enum_funcname(prefix, name)
function = module.load_function(BFunc, funcname)
p = self.ffi.new(BType, 256)
if function(p) < 0:
error = self.ffi.string(p)
if sys.version_info >= (3,):
error = str(error, 'utf-8')
raise ffiplatform.VerificationError(error)
def _loaded_gen_enum(self, tp, name, module, library):
for enumerator, enumvalue in zip(tp.enumerators, tp.enumvalues):
setattr(library, enumerator, enumvalue)
type(library)._cffi_dir.append(enumerator)
# ----------
# macros: for now only for integers
def _generate_gen_macro_decl(self, tp, name):
assert tp == '...'
self._generate_gen_const(True, name)
_loading_gen_macro = _loaded_noop
def _loaded_gen_macro(self, tp, name, module, library):
value = self._load_constant(True, tp, name, module)
setattr(library, name, value)
type(library)._cffi_dir.append(name)
# ----------
# global variables
def _generate_gen_variable_decl(self, tp, name):
if isinstance(tp, model.ArrayType):
if tp.length == '...':
prnt = self._prnt
funcname = '_cffi_sizeof_%s' % (name,)
self.export_symbols.append(funcname)
prnt("size_t %s(void)" % funcname)
prnt("{")
prnt(" return sizeof(%s);" % (name,))
prnt("}")
tp_ptr = model.PointerType(tp.item)
self._generate_gen_const(False, name, tp_ptr)
else:
tp_ptr = model.PointerType(tp)
self._generate_gen_const(False, name, tp_ptr, category='var')
_loading_gen_variable = _loaded_noop
def _loaded_gen_variable(self, tp, name, module, library):
if isinstance(tp, model.ArrayType): # int a[5] is "constant" in the
# sense that "a=..." is forbidden
if tp.length == '...':
funcname = '_cffi_sizeof_%s' % (name,)
BFunc = self.ffi._typeof_locked('size_t(*)(void)')[0]
function = module.load_function(BFunc, funcname)
size = function()
BItemType = self.ffi._get_cached_btype(tp.item)
length, rest = divmod(size, self.ffi.sizeof(BItemType))
if rest != 0:
raise ffiplatform.VerificationError(
"bad size: %r does not seem to be an array of %s" %
(name, tp.item))
tp = tp.resolve_length(length)
tp_ptr = model.PointerType(tp.item)
value = self._load_constant(False, tp_ptr, name, module)
# 'value' is a <cdata 'type *'> which we have to replace with
# a <cdata 'type[N]'> if the N is actually known
if tp.length is not None:
BArray = self.ffi._get_cached_btype(tp)
value = self.ffi.cast(BArray, value)
setattr(library, name, value)
type(library)._cffi_dir.append(name)
return
# remove ptr=<cdata 'int *'> from the library instance, and replace
# it by a property on the class, which reads/writes into ptr[0].
funcname = '_cffi_var_%s' % name
BFunc = self.ffi._typeof_locked(tp.get_c_name('*(*)(void)', name))[0]
function = module.load_function(BFunc, funcname)
ptr = function()
def getter(library):
return ptr[0]
def setter(library, value):
ptr[0] = value
setattr(type(library), name, property(getter, setter))
type(library)._cffi_dir.append(name)
cffimod_header = r'''
#include <stdio.h>
#include <stddef.h>
#include <stdarg.h>
#include <errno.h>
#include <sys/types.h> /* XXX for ssize_t on some platforms */
/* this block of #ifs should be kept exactly identical between
c/_cffi_backend.c, cffi/vengine_cpy.py, cffi/vengine_gen.py */
#if defined(_MSC_VER)
# include <malloc.h> /* for alloca() */
# if _MSC_VER < 1600 /* MSVC < 2010 */
typedef __int8 int8_t;
typedef __int16 int16_t;
typedef __int32 int32_t;
typedef __int64 int64_t;
typedef unsigned __int8 uint8_t;
typedef unsigned __int16 uint16_t;
typedef unsigned __int32 uint32_t;
typedef unsigned __int64 uint64_t;
# else
# include <stdint.h>
# endif
# if _MSC_VER < 1800 /* MSVC < 2013 */
typedef unsigned char _Bool;
# endif
#else
# include <stdint.h>
# if (defined (__SVR4) && defined (__sun)) || defined(_AIX)
# include <alloca.h>
# endif
#endif
'''

View file

@ -0,0 +1,243 @@
import sys, os, binascii, imp, shutil
from . import __version__
from . import ffiplatform
class Verifier(object):
def __init__(self, ffi, preamble, tmpdir=None, modulename=None,
ext_package=None, tag='', force_generic_engine=False, **kwds):
self.ffi = ffi
self.preamble = preamble
if not modulename:
flattened_kwds = ffiplatform.flatten(kwds)
vengine_class = _locate_engine_class(ffi, force_generic_engine)
self._vengine = vengine_class(self)
self._vengine.patch_extension_kwds(kwds)
self.kwds = kwds
#
if modulename:
if tag:
raise TypeError("can't specify both 'modulename' and 'tag'")
else:
key = '\x00'.join([sys.version[:3], __version__, preamble,
flattened_kwds] +
ffi._cdefsources)
if sys.version_info >= (3,):
key = key.encode('utf-8')
k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff)
k1 = k1.lstrip('0x').rstrip('L')
k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff)
k2 = k2.lstrip('0').rstrip('L')
modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key,
k1, k2)
suffix = _get_so_suffixes()[0]
self.tmpdir = tmpdir or _caller_dir_pycache()
self.sourcefilename = os.path.join(self.tmpdir, modulename + '.c')
self.modulefilename = os.path.join(self.tmpdir, modulename + suffix)
self.ext_package = ext_package
self._has_source = False
self._has_module = False
def write_source(self, file=None):
"""Write the C source code. It is produced in 'self.sourcefilename',
which can be tweaked beforehand."""
with self.ffi._lock:
if self._has_source and file is None:
raise ffiplatform.VerificationError(
"source code already written")
self._write_source(file)
def compile_module(self):
"""Write the C source code (if not done already) and compile it.
This produces a dynamic link library in 'self.modulefilename'."""
with self.ffi._lock:
if self._has_module:
raise ffiplatform.VerificationError("module already compiled")
if not self._has_source:
self._write_source()
self._compile_module()
def load_library(self):
"""Get a C module from this Verifier instance.
Returns an instance of a FFILibrary class that behaves like the
objects returned by ffi.dlopen(), but that delegates all
operations to the C module. If necessary, the C code is written
and compiled first.
"""
with self.ffi._lock:
if not self._has_module:
self._locate_module()
if not self._has_module:
if not self._has_source:
self._write_source()
self._compile_module()
return self._load_library()
def get_module_name(self):
basename = os.path.basename(self.modulefilename)
# kill both the .so extension and the other .'s, as introduced
# by Python 3: 'basename.cpython-33m.so'
basename = basename.split('.', 1)[0]
# and the _d added in Python 2 debug builds --- but try to be
# conservative and not kill a legitimate _d
if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'):
basename = basename[:-2]
return basename
def get_extension(self):
if not self._has_source:
with self.ffi._lock:
if not self._has_source:
self._write_source()
sourcename = ffiplatform.maybe_relative_path(self.sourcefilename)
modname = self.get_module_name()
return ffiplatform.get_extension(sourcename, modname, **self.kwds)
def generates_python_module(self):
return self._vengine._gen_python_module
# ----------
def _locate_module(self):
if not os.path.isfile(self.modulefilename):
if self.ext_package:
try:
pkg = __import__(self.ext_package, None, None, ['__doc__'])
except ImportError:
return # cannot import the package itself, give up
# (e.g. it might be called differently before installation)
path = pkg.__path__
else:
path = None
filename = self._vengine.find_module(self.get_module_name(), path,
_get_so_suffixes())
if filename is None:
return
self.modulefilename = filename
self._vengine.collect_types()
self._has_module = True
def _write_source(self, file=None):
must_close = (file is None)
if must_close:
_ensure_dir(self.sourcefilename)
file = open(self.sourcefilename, 'w')
self._vengine._f = file
try:
self._vengine.write_source_to_f()
finally:
del self._vengine._f
if must_close:
file.close()
if must_close:
self._has_source = True
def _compile_module(self):
# compile this C source
tmpdir = os.path.dirname(self.sourcefilename)
outputfilename = ffiplatform.compile(tmpdir, self.get_extension())
try:
same = ffiplatform.samefile(outputfilename, self.modulefilename)
except OSError:
same = False
if not same:
_ensure_dir(self.modulefilename)
shutil.move(outputfilename, self.modulefilename)
self._has_module = True
def _load_library(self):
assert self._has_module
return self._vengine.load_library()
# ____________________________________________________________
_FORCE_GENERIC_ENGINE = False # for tests
def _locate_engine_class(ffi, force_generic_engine):
if _FORCE_GENERIC_ENGINE:
force_generic_engine = True
if not force_generic_engine:
if '__pypy__' in sys.builtin_module_names:
force_generic_engine = True
else:
try:
import _cffi_backend
except ImportError:
_cffi_backend = '?'
if ffi._backend is not _cffi_backend:
force_generic_engine = True
if force_generic_engine:
from . import vengine_gen
return vengine_gen.VGenericEngine
else:
from . import vengine_cpy
return vengine_cpy.VCPythonEngine
# ____________________________________________________________
_TMPDIR = None
def _caller_dir_pycache():
if _TMPDIR:
return _TMPDIR
filename = sys._getframe(2).f_code.co_filename
return os.path.abspath(os.path.join(os.path.dirname(filename),
'__pycache__'))
def set_tmpdir(dirname):
"""Set the temporary directory to use instead of __pycache__."""
global _TMPDIR
_TMPDIR = dirname
def cleanup_tmpdir(tmpdir=None, keep_so=False):
"""Clean up the temporary directory by removing all files in it
called `_cffi_*.{c,so}` as well as the `build` subdirectory."""
tmpdir = tmpdir or _caller_dir_pycache()
try:
filelist = os.listdir(tmpdir)
except OSError:
return
if keep_so:
suffix = '.c' # only remove .c files
else:
suffix = _get_so_suffixes()[0].lower()
for fn in filelist:
if fn.lower().startswith('_cffi_') and (
fn.lower().endswith(suffix) or fn.lower().endswith('.c')):
try:
os.unlink(os.path.join(tmpdir, fn))
except OSError:
pass
clean_dir = [os.path.join(tmpdir, 'build')]
for dir in clean_dir:
try:
for fn in os.listdir(dir):
fn = os.path.join(dir, fn)
if os.path.isdir(fn):
clean_dir.append(fn)
else:
os.unlink(fn)
except OSError:
pass
def _get_so_suffixes():
suffixes = []
for suffix, mode, type in imp.get_suffixes():
if type == imp.C_EXTENSION:
suffixes.append(suffix)
if not suffixes:
# bah, no C_EXTENSION available. Occurs on pypy without cpyext
if sys.platform == 'win32':
suffixes = [".pyd"]
else:
suffixes = [".so"]
return suffixes
def _ensure_dir(filename):
try:
os.makedirs(os.path.dirname(filename))
except OSError:
pass