161 lines
3.7 KiB
Python
161 lines
3.7 KiB
Python
##
|
|
#
|
|
# bencode.py python3 compatable bencode / bdecode
|
|
#
|
|
##
|
|
|
|
class Decoder(object):
|
|
|
|
def _decode_int(self):
|
|
"""
|
|
decode integer from bytearray
|
|
return int
|
|
"""
|
|
self.idx += 1
|
|
start = self.idx
|
|
end = self.data.index(b'e', self.idx)
|
|
self.idx = end + 1
|
|
return int(self.data[start:end])
|
|
|
|
def _decode_str(self):
|
|
"""
|
|
decode string from bytearray
|
|
return string
|
|
"""
|
|
start = self.data.index(b':', self.idx)
|
|
l = int(self.data[self.idx:start].decode(), 10)
|
|
if l <= 0:
|
|
raise Exception('invalid string size: %d' % l)
|
|
start += 1
|
|
ret = self.data[start:start+l]
|
|
try:
|
|
ret = ret.decode('utf-8')
|
|
except:
|
|
pass
|
|
self.idx = start + l
|
|
return ret
|
|
|
|
def _decode_list(self):
|
|
"""
|
|
decode list from bytearray
|
|
return list
|
|
"""
|
|
ls = []
|
|
self.idx += 1
|
|
while self.data[self.idx] != ord(b'e'):
|
|
ls.append(self._decode())
|
|
self.idx += 1
|
|
return ls
|
|
|
|
def _decode_dict(self):
|
|
"""
|
|
decode dict from bytearray
|
|
return dict
|
|
"""
|
|
d = {}
|
|
self.idx += 1
|
|
while self.data[self.idx] != ord(b'e'):
|
|
k = self._decode_str()
|
|
v = self._decode()
|
|
d[k] = v
|
|
self.idx += 1
|
|
return d
|
|
|
|
def _decode(self):
|
|
ch = chr(self.data[self.idx])
|
|
if ch == 'l':
|
|
return self._decode_list()
|
|
elif ch == 'i':
|
|
return self._decode_int()
|
|
elif ch == 'd':
|
|
return self._decode_dict()
|
|
elif ch.isdigit():
|
|
return self._decode_str()
|
|
else:
|
|
raise Exception('could not decode data: %s' % data)
|
|
|
|
def decode(self, data):
|
|
self.idx = 0
|
|
self.data = data
|
|
obj = self._decode()
|
|
if len(data) != self.idx:
|
|
raise Exception('failed to decode, extra data: %s' % data)
|
|
return obj
|
|
|
|
def bdecode(data):
|
|
"""
|
|
decode a bytearray
|
|
return decoded object
|
|
"""
|
|
return Decoder().decode(data)
|
|
|
|
def _encode_str(s, buff):
|
|
"""
|
|
encode string to a buffer
|
|
"""
|
|
s = bytearray(s)
|
|
l = len(s)
|
|
buff.append(bytearray(str(l)+':', 'utf-8'))
|
|
buff.append(s)
|
|
|
|
def _encode_int(i, buff):
|
|
"""
|
|
encode integer to a buffer
|
|
"""
|
|
buff.append(b'i')
|
|
buff.append(bytearray(str(i), 'ascii'))
|
|
buff.append(b'e')
|
|
|
|
def _encode_list(l, buff):
|
|
"""
|
|
encode list of elements to a buffer
|
|
"""
|
|
buff.append(b'l')
|
|
for i in l:
|
|
_encode(i, buff)
|
|
buff.append(b'e')
|
|
|
|
def _encode_dict(d, buff):
|
|
"""
|
|
encode dict
|
|
"""
|
|
buff.append(b'd')
|
|
for k in sorted(d):
|
|
if not isinstance(k, [bytes, str]):
|
|
k = str(k)
|
|
_encode(k, buff)
|
|
_encode(d[k], buff)
|
|
buff.append(b'e')
|
|
|
|
def _encode(obj, buff):
|
|
"""
|
|
encode element obj to a buffer buff
|
|
"""
|
|
if isinstance(obj, str):
|
|
_encode_str(bytearray(obj, 'utf-8'), buff)
|
|
elif isinstance(obj, bytes):
|
|
_encode_str(bytearray(obj), buff)
|
|
elif isinstance(obj, bytearray):
|
|
_encode_str(obj, buff)
|
|
elif str(obj).isdigit():
|
|
_encode_int(obj, buff)
|
|
elif isinstance(obj, list):
|
|
_encode_list(obj, buff)
|
|
elif hasattr(obj, 'keys') and hasattr(obj, 'values'):
|
|
_encode_dict(obj, buff)
|
|
elif str(obj) in ['True', 'False']:
|
|
_encode_int(int(obj and '1' or '0'), buff)
|
|
else:
|
|
raise Exception('non serializable object: %s' % obj)
|
|
|
|
|
|
def bencode(obj):
|
|
"""
|
|
bencode element, return bytearray
|
|
"""
|
|
buff = []
|
|
_encode(obj, buff)
|
|
ret = bytearray()
|
|
for ba in buff:
|
|
ret += ba
|
|
return bytes(ret)
|