from __future__ import absolute_import, division, unicode_literals from six import text_type from six.moves import http_client import codecs import re from .constants import EOF, spaceCharacters, asciiLetters, asciiUppercase from .constants import encodings, ReparseException from . import utils from io import StringIO try: from io import BytesIO except ImportError: BytesIO = StringIO try: from io import BufferedIOBase except ImportError: class BufferedIOBase(object): pass # Non-unicode versions of constants for use in the pre-parser spaceCharactersBytes = frozenset([item.encode("ascii") for item in spaceCharacters]) asciiLettersBytes = frozenset([item.encode("ascii") for item in asciiLetters]) asciiUppercaseBytes = frozenset([item.encode("ascii") for item in asciiUppercase]) spacesAngleBrackets = spaceCharactersBytes | frozenset([b">", b"<"]) invalid_unicode_re = re.compile("[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uD800-\uDFFF\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]") non_bmp_invalid_codepoints = set([0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE, 0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF, 0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE, 0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF, 0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE, 0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF, 0x10FFFE, 0x10FFFF]) ascii_punctuation_re = re.compile("[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005B-\u0060\u007B-\u007E]") # Cache for charsUntil() charsUntilRegEx = {} class BufferedStream(object): """Buffering for streams that do not have buffering of their own The buffer is implemented as a list of chunks on the assumption that joining many strings will be slow since it is O(n**2) """ def __init__(self, stream): self.stream = stream self.buffer = [] self.position = [-1, 0] # chunk number, offset def tell(self): pos = 0 for chunk in self.buffer[:self.position[0]]: pos += len(chunk) pos += self.position[1] return pos def seek(self, pos): assert pos <= self._bufferedBytes() offset = pos i = 0 while len(self.buffer[i]) < offset: offset -= len(self.buffer[i]) i += 1 self.position = [i, offset] def read(self, bytes): if not self.buffer: return self._readStream(bytes) elif (self.position[0] == len(self.buffer) and self.position[1] == len(self.buffer[-1])): return self._readStream(bytes) else: return self._readFromBuffer(bytes) def _bufferedBytes(self): return sum([len(item) for item in self.buffer]) def _readStream(self, bytes): data = self.stream.read(bytes) self.buffer.append(data) self.position[0] += 1 self.position[1] = len(data) return data def _readFromBuffer(self, bytes): remainingBytes = bytes rv = [] bufferIndex = self.position[0] bufferOffset = self.position[1] while bufferIndex < len(self.buffer) and remainingBytes != 0: assert remainingBytes > 0 bufferedData = self.buffer[bufferIndex] if remainingBytes <= len(bufferedData) - bufferOffset: bytesToRead = remainingBytes self.position = [bufferIndex, bufferOffset + bytesToRead] else: bytesToRead = len(bufferedData) - bufferOffset self.position = [bufferIndex, len(bufferedData)] bufferIndex += 1 rv.append(bufferedData[bufferOffset:bufferOffset + bytesToRead]) remainingBytes -= bytesToRead bufferOffset = 0 if remainingBytes: rv.append(self._readStream(remainingBytes)) return b"".join(rv) def HTMLInputStream(source, encoding=None, parseMeta=True, chardet=True): if isinstance(source, http_client.HTTPResponse): # Work around Python bug #20007: read(0) closes the connection. # http://bugs.python.org/issue20007 isUnicode = False elif hasattr(source, "read"): isUnicode = isinstance(source.read(0), text_type) else: isUnicode = isinstance(source, text_type) if isUnicode: if encoding is not None: raise TypeError("Cannot explicitly set an encoding with a unicode string") return HTMLUnicodeInputStream(source) else: return HTMLBinaryInputStream(source, encoding, parseMeta, chardet) class HTMLUnicodeInputStream(object): """Provides a unicode stream of characters to the HTMLTokenizer. This class takes care of character encoding and removing or replacing incorrect byte-sequences and also provides column and line tracking. """ _defaultChunkSize = 10240 def __init__(self, source): """Initialises the HTMLInputStream. HTMLInputStream(source, [encoding]) -> Normalized stream from source for use by html5lib. source can be either a file-object, local filename or a string. The optional encoding parameter must be a string that indicates the encoding. If specified, that encoding will be used, regardless of any BOM or later declaration (such as in a meta element) parseMeta - Look for a element containing encoding information """ # Craziness if len("\U0010FFFF") == 1: self.reportCharacterErrors = self.characterErrorsUCS4 self.replaceCharactersRegexp = re.compile("[\uD800-\uDFFF]") else: self.reportCharacterErrors = self.characterErrorsUCS2 self.replaceCharactersRegexp = re.compile("([\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?= self.chunkSize: if not self.readChunk(): return EOF chunkOffset = self.chunkOffset char = self.chunk[chunkOffset] self.chunkOffset = chunkOffset + 1 return char def readChunk(self, chunkSize=None): if chunkSize is None: chunkSize = self._defaultChunkSize self.prevNumLines, self.prevNumCols = self._position(self.chunkSize) self.chunk = "" self.chunkSize = 0 self.chunkOffset = 0 data = self.dataStream.read(chunkSize) # Deal with CR LF and surrogates broken across chunks if self._bufferedCharacter: data = self._bufferedCharacter + data self._bufferedCharacter = None elif not data: # We have no more data, bye-bye stream return False if len(data) > 1: lastv = ord(data[-1]) if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF: self._bufferedCharacter = data[-1] data = data[:-1] self.reportCharacterErrors(data) # Replace invalid characters # Note U+0000 is dealt with in the tokenizer data = self.replaceCharactersRegexp.sub("\ufffd", data) data = data.replace("\r\n", "\n") data = data.replace("\r", "\n") self.chunk = data self.chunkSize = len(data) return True def characterErrorsUCS4(self, data): for i in range(len(invalid_unicode_re.findall(data))): self.errors.append("invalid-codepoint") def characterErrorsUCS2(self, data): # Someone picked the wrong compile option # You lose skip = False for match in invalid_unicode_re.finditer(data): if skip: continue codepoint = ord(match.group()) pos = match.start() # Pretty sure there should be endianness issues here if utils.isSurrogatePair(data[pos:pos + 2]): # We have a surrogate pair! char_val = utils.surrogatePairToCodepoint(data[pos:pos + 2]) if char_val in non_bmp_invalid_codepoints: self.errors.append("invalid-codepoint") skip = True elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and pos == len(data) - 1): self.errors.append("invalid-codepoint") else: skip = False self.errors.append("invalid-codepoint") def charsUntil(self, characters, opposite=False): """ Returns a string of characters from the stream up to but not including any character in 'characters' or EOF. 'characters' must be a container that supports the 'in' method and iteration over its characters. """ # Use a cache of regexps to find the required characters try: chars = charsUntilRegEx[(characters, opposite)] except KeyError: if __debug__: for c in characters: assert(ord(c) < 128) regex = "".join(["\\x%02x" % ord(c) for c in characters]) if not opposite: regex = "^%s" % regex chars = charsUntilRegEx[(characters, opposite)] = re.compile("[%s]+" % regex) rv = [] while True: # Find the longest matching prefix m = chars.match(self.chunk, self.chunkOffset) if m is None: # If nothing matched, and it wasn't because we ran out of chunk, # then stop if self.chunkOffset != self.chunkSize: break else: end = m.end() # If not the whole chunk matched, return everything # up to the part that didn't match if end != self.chunkSize: rv.append(self.chunk[self.chunkOffset:end]) self.chunkOffset = end break # If the whole remainder of the chunk matched, # use it all and read the next chunk rv.append(self.chunk[self.chunkOffset:]) if not self.readChunk(): # Reached EOF break r = "".join(rv) return r def unget(self, char): # Only one character is allowed to be ungotten at once - it must # be consumed again before any further call to unget if char is not None: if self.chunkOffset == 0: # unget is called quite rarely, so it's a good idea to do # more work here if it saves a bit of work in the frequently # called char and charsUntil. # So, just prepend the ungotten character onto the current # chunk: self.chunk = char + self.chunk self.chunkSize += 1 else: self.chunkOffset -= 1 assert self.chunk[self.chunkOffset] == char class HTMLBinaryInputStream(HTMLUnicodeInputStream): """Provides a unicode stream of characters to the HTMLTokenizer. This class takes care of character encoding and removing or replacing incorrect byte-sequences and also provides column and line tracking. """ def __init__(self, source, encoding=None, parseMeta=True, chardet=True): """Initialises the HTMLInputStream. HTMLInputStream(source, [encoding]) -> Normalized stream from source for use by html5lib. source can be either a file-object, local filename or a string. The optional encoding parameter must be a string that indicates the encoding. If specified, that encoding will be used, regardless of any BOM or later declaration (such as in a meta element) parseMeta - Look for a element containing encoding information """ # Raw Stream - for unicode objects this will encode to utf-8 and set # self.charEncoding as appropriate self.rawStream = self.openStream(source) HTMLUnicodeInputStream.__init__(self, self.rawStream) self.charEncoding = (codecName(encoding), "certain") # Encoding Information # Number of bytes to use when looking for a meta element with # encoding information self.numBytesMeta = 512 # Number of bytes to use when using detecting encoding using chardet self.numBytesChardet = 100 # Encoding to use if no other information can be found self.defaultEncoding = "windows-1252" # Detect encoding iff no explicit "transport level" encoding is supplied if (self.charEncoding[0] is None): self.charEncoding = self.detectEncoding(parseMeta, chardet) # Call superclass self.reset() def reset(self): self.dataStream = codecs.getreader(self.charEncoding[0])(self.rawStream, 'replace') HTMLUnicodeInputStream.reset(self) def openStream(self, source): """Produces a file object from source. source can be either a file object, local filename or a string. """ # Already a file object if hasattr(source, 'read'): stream = source else: stream = BytesIO(source) try: stream.seek(stream.tell()) except: stream = BufferedStream(stream) return stream def detectEncoding(self, parseMeta=True, chardet=True): # First look for a BOM # This will also read past the BOM if present encoding = self.detectBOM() confidence = "certain" # If there is no BOM need to look for meta elements with encoding # information if encoding is None and parseMeta: encoding = self.detectEncodingMeta() confidence = "tentative" # Guess with chardet, if avaliable if encoding is None and chardet: confidence = "tentative" try: try: from charade.universaldetector import UniversalDetector except ImportError: from chardet.universaldetector import UniversalDetector buffers = [] detector = UniversalDetector() while not detector.done: buffer = self.rawStream.read(self.numBytesChardet) assert isinstance(buffer, bytes) if not buffer: break buffers.append(buffer) detector.feed(buffer) detector.close() encoding = detector.result['encoding'] self.rawStream.seek(0) except ImportError: pass # If all else fails use the default encoding if encoding is None: confidence = "tentative" encoding = self.defaultEncoding # Substitute for equivalent encodings: encodingSub = {"iso-8859-1": "windows-1252"} if encoding.lower() in encodingSub: encoding = encodingSub[encoding.lower()] return encoding, confidence def changeEncoding(self, newEncoding): assert self.charEncoding[1] != "certain" newEncoding = codecName(newEncoding) if newEncoding in ("utf-16", "utf-16-be", "utf-16-le"): newEncoding = "utf-8" if newEncoding is None: return elif newEncoding == self.charEncoding[0]: self.charEncoding = (self.charEncoding[0], "certain") else: self.rawStream.seek(0) self.reset() self.charEncoding = (newEncoding, "certain") raise ReparseException("Encoding changed from %s to %s" % (self.charEncoding[0], newEncoding)) def detectBOM(self): """Attempts to detect at BOM at the start of the stream. If an encoding can be determined from the BOM return the name of the encoding otherwise return None""" bomDict = { codecs.BOM_UTF8: 'utf-8', codecs.BOM_UTF16_LE: 'utf-16-le', codecs.BOM_UTF16_BE: 'utf-16-be', codecs.BOM_UTF32_LE: 'utf-32-le', codecs.BOM_UTF32_BE: 'utf-32-be' } # Go to beginning of file and read in 4 bytes string = self.rawStream.read(4) assert isinstance(string, bytes) # Try detecting the BOM using bytes from the string encoding = bomDict.get(string[:3]) # UTF-8 seek = 3 if not encoding: # Need to detect UTF-32 before UTF-16 encoding = bomDict.get(string) # UTF-32 seek = 4 if not encoding: encoding = bomDict.get(string[:2]) # UTF-16 seek = 2 # Set the read position past the BOM if one was found, otherwise # set it to the start of the stream self.rawStream.seek(encoding and seek or 0) return encoding def detectEncodingMeta(self): """Report the encoding declared by the meta element """ buffer = self.rawStream.read(self.numBytesMeta) assert isinstance(buffer, bytes) parser = EncodingParser(buffer) self.rawStream.seek(0) encoding = parser.getEncoding() if encoding in ("utf-16", "utf-16-be", "utf-16-le"): encoding = "utf-8" return encoding class EncodingBytes(bytes): """String-like object with an associated position and various extra methods If the position is ever greater than the string length then an exception is raised""" def __new__(self, value): assert isinstance(value, bytes) return bytes.__new__(self, value.lower()) def __init__(self, value): self._position = -1 def __iter__(self): return self def __next__(self): p = self._position = self._position + 1 if p >= len(self): raise StopIteration elif p < 0: raise TypeError return self[p:p + 1] def next(self): # Py2 compat return self.__next__() def previous(self): p = self._position if p >= len(self): raise StopIteration elif p < 0: raise TypeError self._position = p = p - 1 return self[p:p + 1] def setPosition(self, position): if self._position >= len(self): raise StopIteration self._position = position def getPosition(self): if self._position >= len(self): raise StopIteration if self._position >= 0: return self._position else: return None position = property(getPosition, setPosition) def getCurrentByte(self): return self[self.position:self.position + 1] currentByte = property(getCurrentByte) def skip(self, chars=spaceCharactersBytes): """Skip past a list of characters""" p = self.position # use property for the error-checking while p < len(self): c = self[p:p + 1] if c not in chars: self._position = p return c p += 1 self._position = p return None def skipUntil(self, chars): p = self.position while p < len(self): c = self[p:p + 1] if c in chars: self._position = p return c p += 1 self._position = p return None def matchBytes(self, bytes): """Look for a sequence of bytes at the start of a string. If the bytes are found return True and advance the position to the byte after the match. Otherwise return False and leave the position alone""" p = self.position data = self[p:p + len(bytes)] rv = data.startswith(bytes) if rv: self.position += len(bytes) return rv def jumpTo(self, bytes): """Look for the next sequence of bytes matching a given sequence. If a match is found advance the position to the last byte of the match""" newPosition = self[self.position:].find(bytes) if newPosition > -1: # XXX: This is ugly, but I can't see a nicer way to fix this. if self._position == -1: self._position = 0 self._position += (newPosition + len(bytes) - 1) return True else: raise StopIteration class EncodingParser(object): """Mini parser for detecting character encoding from meta elements""" def __init__(self, data): """string - the data to work on for encoding detection""" self.data = EncodingBytes(data) self.encoding = None def getEncoding(self): methodDispatch = ( (b"") def handleMeta(self): if self.data.currentByte not in spaceCharactersBytes: # if we have ") def getAttribute(self): """Return a name,value pair for the next attribute in the stream, if one is found, or None""" data = self.data # Step 1 (skip chars) c = data.skip(spaceCharactersBytes | frozenset([b"/"])) assert c is None or len(c) == 1 # Step 2 if c in (b">", None): return None # Step 3 attrName = [] attrValue = [] # Step 4 attribute name while True: if c == b"=" and attrName: break elif c in spaceCharactersBytes: # Step 6! c = data.skip() break elif c in (b"/", b">"): return b"".join(attrName), b"" elif c in asciiUppercaseBytes: attrName.append(c.lower()) elif c is None: return None else: attrName.append(c) # Step 5 c = next(data) # Step 7 if c != b"=": data.previous() return b"".join(attrName), b"" # Step 8 next(data) # Step 9 c = data.skip() # Step 10 if c in (b"'", b'"'): # 10.1 quoteChar = c while True: # 10.2 c = next(data) # 10.3 if c == quoteChar: next(data) return b"".join(attrName), b"".join(attrValue) # 10.4 elif c in asciiUppercaseBytes: attrValue.append(c.lower()) # 10.5 else: attrValue.append(c) elif c == b">": return b"".join(attrName), b"" elif c in asciiUppercaseBytes: attrValue.append(c.lower()) elif c is None: return None else: attrValue.append(c) # Step 11 while True: c = next(data) if c in spacesAngleBrackets: return b"".join(attrName), b"".join(attrValue) elif c in asciiUppercaseBytes: attrValue.append(c.lower()) elif c is None: return None else: attrValue.append(c) class ContentAttrParser(object): def __init__(self, data): assert isinstance(data, bytes) self.data = data def parse(self): try: # Check if the attr name is charset # otherwise return self.data.jumpTo(b"charset") self.data.position += 1 self.data.skip() if not self.data.currentByte == b"=": # If there is no = sign keep looking for attrs return None self.data.position += 1 self.data.skip() # Look for an encoding between matching quote marks if self.data.currentByte in (b'"', b"'"): quoteMark = self.data.currentByte self.data.position += 1 oldPosition = self.data.position if self.data.jumpTo(quoteMark): return self.data[oldPosition:self.data.position] else: return None else: # Unquoted value oldPosition = self.data.position try: self.data.skipUntil(spaceCharactersBytes) return self.data[oldPosition:self.data.position] except StopIteration: # Return the whole remaining value return self.data[oldPosition:] except StopIteration: return None def codecName(encoding): """Return the python codec name corresponding to an encoding or None if the string doesn't correspond to a valid encoding.""" if isinstance(encoding, bytes): try: encoding = encoding.decode("ascii") except UnicodeDecodeError: return None if encoding: canonicalName = ascii_punctuation_re.sub("", encoding).lower() return encodings.get(canonicalName, None) else: return None