import os import re import sys from distutils import log import xml.dom.pulldom import shlex import locale import codecs import unicodedata import warnings from setuptools.compat import unicode from xml.sax.saxutils import unescape try: import urlparse except ImportError: import urllib.parse as urlparse from subprocess import Popen as _Popen, PIPE as _PIPE #NOTE: Use of the command line options require SVN 1.3 or newer (December 2005) # and SVN 1.3 hasn't been supported by the developers since mid 2008. #subprocess is called several times with shell=(sys.platform=='win32') #see the follow for more information: # http://bugs.python.org/issue8557 # http://stackoverflow.com/questions/5658622/ # python-subprocess-popen-environment-path def _run_command(args, stdout=_PIPE, stderr=_PIPE, encoding=None, stream=0): #regarding the shell argument, see: http://bugs.python.org/issue8557 try: proc = _Popen(args, stdout=stdout, stderr=stderr, shell=(sys.platform == 'win32')) data = proc.communicate()[stream] except OSError: return 1, '' #doubled checked and data = decode_as_string(data, encoding) #communciate calls wait() return proc.returncode, data def _get_entry_schedule(entry): schedule = entry.getElementsByTagName('schedule')[0] return "".join([t.nodeValue for t in schedule.childNodes if t.nodeType == t.TEXT_NODE]) def _get_target_property(target): property_text = target.getElementsByTagName('property')[0] return "".join([t.nodeValue for t in property_text.childNodes if t.nodeType == t.TEXT_NODE]) def _get_xml_data(decoded_str): if sys.version_info < (3, 0): #old versions want an encoded string data = decoded_str.encode('utf-8') else: data = decoded_str return data def joinpath(prefix, *suffix): if not prefix or prefix == '.': return os.path.join(*suffix) return os.path.join(prefix, *suffix) def determine_console_encoding(): try: #try for the preferred encoding encoding = locale.getpreferredencoding() #see if the locale.getdefaultlocale returns null #some versions of python\platforms return US-ASCII #when it cannot determine an encoding if not encoding or encoding == "US-ASCII": encoding = locale.getdefaultlocale()[1] if encoding: codecs.lookup(encoding) # make sure a lookup error is not made except (locale.Error, LookupError): encoding = None is_osx = sys.platform == "darwin" if not encoding: return ["US-ASCII", "utf-8"][is_osx] elif encoding.startswith("mac-") and is_osx: #certain versions of python would return mac-roman as default #OSX as a left over of earlier mac versions. return "utf-8" else: return encoding _console_encoding = determine_console_encoding() def decode_as_string(text, encoding=None): """ Decode the console or file output explicitly using getpreferredencoding. The text paraemeter should be a encoded string, if not no decode occurs If no encoding is given, getpreferredencoding is used. If encoding is specified, that is used instead. This would be needed for SVN --xml output. Unicode is explicitly put in composed NFC form. --xml should be UTF-8 (SVN Issue 2938) the discussion on the Subversion DEV List from 2007 seems to indicate the same. """ #text should be a byte string if encoding is None: encoding = _console_encoding if not isinstance(text, unicode): text = text.decode(encoding) text = unicodedata.normalize('NFC', text) return text def parse_dir_entries(decoded_str): '''Parse the entries from a recursive info xml''' doc = xml.dom.pulldom.parseString(_get_xml_data(decoded_str)) entries = list() for event, node in doc: if event == 'START_ELEMENT' and node.nodeName == 'entry': doc.expandNode(node) if not _get_entry_schedule(node).startswith('delete'): entries.append((node.getAttribute('path'), node.getAttribute('kind'))) return entries[1:] # do not want the root directory def parse_externals_xml(decoded_str, prefix=''): '''Parse a propget svn:externals xml''' prefix = os.path.normpath(prefix) prefix = os.path.normcase(prefix) doc = xml.dom.pulldom.parseString(_get_xml_data(decoded_str)) externals = list() for event, node in doc: if event == 'START_ELEMENT' and node.nodeName == 'target': doc.expandNode(node) path = os.path.normpath(node.getAttribute('path')) if os.path.normcase(path).startswith(prefix): path = path[len(prefix)+1:] data = _get_target_property(node) #data should be decoded already for external in parse_external_prop(data): externals.append(joinpath(path, external)) return externals # do not want the root directory def parse_external_prop(lines): """ Parse the value of a retrieved svn:externals entry. possible token setups (with quotng and backscaping in laters versions) URL[@#] EXT_FOLDERNAME [-r#] URL EXT_FOLDERNAME EXT_FOLDERNAME [-r#] URL """ externals = [] for line in lines.splitlines(): line = line.lstrip() # there might be a "\ " if not line: continue if sys.version_info < (3, 0): #shlex handles NULLs just fine and shlex in 2.7 tries to encode #as ascii automatiically line = line.encode('utf-8') line = shlex.split(line) if sys.version_info < (3, 0): line = [x.decode('utf-8') for x in line] #EXT_FOLDERNAME is either the first or last depending on where #the URL falls if urlparse.urlsplit(line[-1])[0]: external = line[0] else: external = line[-1] external = decode_as_string(external, encoding="utf-8") externals.append(os.path.normpath(external)) return externals def parse_prop_file(filename, key): found = False f = open(filename, 'rt') data = '' try: for line in iter(f.readline, ''): # can't use direct iter! parts = line.split() if len(parts) == 2: kind, length = parts data = f.read(int(length)) if kind == 'K' and data == key: found = True elif kind == 'V' and found: break finally: f.close() return data class SvnInfo(object): ''' Generic svn_info object. No has little knowledge of how to extract information. Use cls.load to instatiate according svn version. Paths are not filesystem encoded. ''' @staticmethod def get_svn_version(): code, data = _run_command(['svn', '--version', '--quiet']) if code == 0 and data: return data.strip() else: return '' #svnversion return values (previous implementations return max revision) # 4123:4168 mixed revision working copy # 4168M modified working copy # 4123S switched working copy # 4123:4168MS mixed revision, modified, switched working copy revision_re = re.compile(r'(?:([\-0-9]+):)?(\d+)([a-z]*)\s*$', re.I) @classmethod def load(cls, dirname=''): normdir = os.path.normpath(dirname) code, data = _run_command(['svn', 'info', normdir]) # Must check for some contents, as some use empty directories # in testcases svn_dir = os.path.join(normdir, '.svn') has_svn = (os.path.isfile(os.path.join(svn_dir, 'entries')) or os.path.isfile(os.path.join(svn_dir, 'dir-props')) or os.path.isfile(os.path.join(svn_dir, 'dir-prop-base'))) svn_version = tuple(cls.get_svn_version().split('.')) try: base_svn_version = tuple(int(x) for x in svn_version[:2]) except ValueError: base_svn_version = tuple() if not has_svn: return SvnInfo(dirname) if code or not base_svn_version or base_svn_version < (1, 3): warnings.warn(("No SVN 1.3+ command found: falling back " "on pre 1.7 .svn parsing"), DeprecationWarning) return SvnFileInfo(dirname) if base_svn_version < (1, 5): return Svn13Info(dirname) return Svn15Info(dirname) def __init__(self, path=''): self.path = path self._entries = None self._externals = None def get_revision(self): 'Retrieve the directory revision informatino using svnversion' code, data = _run_command(['svnversion', '-c', self.path]) if code: log.warn("svnversion failed") return 0 parsed = self.revision_re.match(data) if parsed: return int(parsed.group(2)) else: return 0 @property def entries(self): if self._entries is None: self._entries = self.get_entries() return self._entries @property def externals(self): if self._externals is None: self._externals = self.get_externals() return self._externals def iter_externals(self): ''' Iterate over the svn:external references in the repository path. ''' for item in self.externals: yield item def iter_files(self): ''' Iterate over the non-deleted file entries in the repository path ''' for item, kind in self.entries: if kind.lower() == 'file': yield item def iter_dirs(self, include_root=True): ''' Iterate over the non-deleted file entries in the repository path ''' if include_root: yield self.path for item, kind in self.entries: if kind.lower() == 'dir': yield item def get_entries(self): return [] def get_externals(self): return [] class Svn13Info(SvnInfo): def get_entries(self): code, data = _run_command(['svn', 'info', '-R', '--xml', self.path], encoding="utf-8") if code: log.debug("svn info failed") return [] return parse_dir_entries(data) def get_externals(self): #Previous to 1.5 --xml was not supported for svn propget and the -R #output format breaks the shlex compatible semantics. cmd = ['svn', 'propget', 'svn:externals'] result = [] for folder in self.iter_dirs(): code, lines = _run_command(cmd + [folder], encoding="utf-8") if code != 0: log.warn("svn propget failed") return [] #lines should a str for external in parse_external_prop(lines): if folder: external = os.path.join(folder, external) result.append(os.path.normpath(external)) return result class Svn15Info(Svn13Info): def get_externals(self): cmd = ['svn', 'propget', 'svn:externals', self.path, '-R', '--xml'] code, lines = _run_command(cmd, encoding="utf-8") if code: log.debug("svn propget failed") return [] return parse_externals_xml(lines, prefix=os.path.abspath(self.path)) class SvnFileInfo(SvnInfo): def __init__(self, path=''): super(SvnFileInfo, self).__init__(path) self._directories = None self._revision = None def _walk_svn(self, base): entry_file = joinpath(base, '.svn', 'entries') if os.path.isfile(entry_file): entries = SVNEntriesFile.load(base) yield (base, False, entries.parse_revision()) for path in entries.get_undeleted_records(): path = decode_as_string(path) path = joinpath(base, path) if os.path.isfile(path): yield (path, True, None) elif os.path.isdir(path): for item in self._walk_svn(path): yield item def _build_entries(self): entries = list() rev = 0 for path, isfile, dir_rev in self._walk_svn(self.path): if isfile: entries.append((path, 'file')) else: entries.append((path, 'dir')) rev = max(rev, dir_rev) self._entries = entries self._revision = rev def get_entries(self): if self._entries is None: self._build_entries() return self._entries def get_revision(self): if self._revision is None: self._build_entries() return self._revision def get_externals(self): prop_files = [['.svn', 'dir-prop-base'], ['.svn', 'dir-props']] externals = [] for dirname in self.iter_dirs(): prop_file = None for rel_parts in prop_files: filename = joinpath(dirname, *rel_parts) if os.path.isfile(filename): prop_file = filename if prop_file is not None: ext_prop = parse_prop_file(prop_file, 'svn:externals') #ext_prop should be utf-8 coming from svn:externals ext_prop = decode_as_string(ext_prop, encoding="utf-8") externals.extend(parse_external_prop(ext_prop)) return externals def svn_finder(dirname=''): #combined externals due to common interface #combined externals and entries due to lack of dir_props in 1.7 info = SvnInfo.load(dirname) for path in info.iter_files(): yield path for path in info.iter_externals(): sub_info = SvnInfo.load(path) for sub_path in sub_info.iter_files(): yield sub_path class SVNEntriesFile(object): def __init__(self, data): self.data = data @classmethod def load(class_, base): filename = os.path.join(base, '.svn', 'entries') f = open(filename) try: result = SVNEntriesFile.read(f) finally: f.close() return result @classmethod def read(class_, fileobj): data = fileobj.read() is_xml = data.startswith(' revision_line_number and section[revision_line_number]) ] return rev_numbers def get_undeleted_records(self): undeleted = lambda s: s and s[0] and (len(s) < 6 or s[5] != 'delete') result = [ section[0] for section in self.get_sections() if undeleted(section) ] return result class SVNEntriesFileXML(SVNEntriesFile): def is_valid(self): return True def get_url(self): "Get repository URL" urlre = re.compile('url="([^"]+)"') return urlre.search(self.data).group(1) def parse_revision_numbers(self): revre = re.compile(r'committed-rev="(\d+)"') return [ int(m.group(1)) for m in revre.finditer(self.data) ] def get_undeleted_records(self): entries_pattern = \ re.compile(r'name="([^"]+)"(?![^>]+deleted="true")', re.I) results = [ unescape(match.group(1)) for match in entries_pattern.finditer(self.data) ] return results if __name__ == '__main__': for name in svn_finder(sys.argv[1]): print(name)