# -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 from datetime import datetime from io import StringIO, BytesIO from PIL import Image import base64 import hashlib import json import os import re import socket import stdnum.isbn import subprocess import sys import time import ox from OpenSSL.crypto import ( load_privatekey, load_certificate, dump_privatekey, dump_certificate, FILETYPE_ASN1, FILETYPE_PEM, PKey, TYPE_RSA, X509, X509Extension ) from Crypto.PublicKey import RSA from Crypto.Util.asn1 import DerSequence from meta.utils import normalize_isbn, find_isbns, get_language, to_isbn13 from win32utils import get_short_path_name import logging logging.getLogger('PIL').setLevel(logging.ERROR) logger = logging.getLogger(__name__) ENCODING='base64' def valid_olid(id): return id.startswith('OL') and id.endswith('M') def get_positions(ids, pos): ''' >>> get_positions([1,2,3,4], [2,4]) {2: 1, 4: 3} ''' positions = {} for i in pos: try: positions[i] = ids.index(i) except: pass return positions def get_by_key(objects, key, value): obj = [o for o in objects if o.get(key) == value] return obj and obj[0] or None def get_by_id(objects, id): return get_by_key(objects, 'id', id) def is_svg(data): return data and b' source_height: width = size height = int(width / (float(source_width) / source_height)) height = height - height % 2 else: height = size width = int(height * (float(source_width) / source_height)) width = width - width % 2 else: height = int(width / (float(source_width) / source_height)) height = height - height % 2 width = max(width, 1) height = max(height, 1) if width < source_width: resize_method = Image.ANTIALIAS else: resize_method = Image.BICUBIC output = source.resize((width, height), resize_method) o = BytesIO() output.save(o, format='jpeg') data = o.getvalue() o.close() return data def sort_title(title): title = title.replace('Æ', 'Ae') if isinstance(title, str): title = str(title) title = ox.sort_string(title) #title title = re.sub('[\'!¿¡,\.;\-"\:\*\[\]]', '', title) return title.strip() def get_position_by_id(list, key): for i in range(0, len(list)): if list[i]['id'] == key: return i return -1 def get_user_id(private_key, cert_path): if os.path.exists(private_key): with open(private_key) as fd: key = load_privatekey(FILETYPE_PEM, fd.read()) if key.bits() != 1024: os.unlink(private_key) else: user_id = get_service_id(private_key) if not os.path.exists(private_key): if os.path.exists(cert_path): os.unlink(cert_path) folder = os.path.dirname(private_key) if not os.path.exists(folder): os.makedirs(folder) os.chmod(folder, 0o700) key = PKey() key.generate_key(TYPE_RSA, 1024) with open(private_key, 'wb') as fd: os.chmod(private_key, 0o600) fd.write(dump_privatekey(FILETYPE_PEM, key)) os.chmod(private_key, 0o400) user_id = get_service_id(private_key) if not os.path.exists(cert_path): ca = X509() ca.set_version(2) ca.set_serial_number(1) ca.get_subject().CN = user_id ca.gmtime_adj_notBefore(0) ca.gmtime_adj_notAfter(24 * 60 * 60) ca.set_issuer(ca.get_subject()) ca.set_pubkey(key) ca.add_extensions([ X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"), X509Extension(b"nsCertType", True, b"sslCA"), X509Extension(b"extendedKeyUsage", True, b"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"), X509Extension(b"keyUsage", False, b"keyCertSign, cRLSign"), X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=ca), ]) ca.sign(key, "sha256") with open(cert_path, 'wb') as fd: fd.write(dump_certificate(FILETYPE_PEM, ca)) return user_id def get_service_id(private_key_file=None, cert=None): ''' service_id is the first half of the sha1 of the rsa public key encoded in base32 ''' if private_key_file: with open(private_key_file, 'rb') as fd: private_key = fd.read() public_key = RSA.importKey(private_key).publickey().exportKey('DER')[22:] # compute sha1 of public key and encode first half in base32 service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() ''' # compute public key from priate key and export in DER format # ignoring the SPKI header(22 bytes) key = load_privatekey(FILETYPE_PEM, private_key) cert = X509() cert.set_pubkey(key) public_key = dump_privatekey(FILETYPE_ASN1, cert.get_pubkey())[22:] # compute sha1 of public key and encode first half in base32 service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() ''' elif cert: # compute sha1 of public key and encode first half in base32 key = load_certificate(FILETYPE_ASN1, cert).get_pubkey() pub_der = DerSequence() pub_der.decode(dump_privatekey(FILETYPE_ASN1, key)) public_key = RSA.construct((pub_der._seq[1], pub_der._seq[2])).exportKey('DER')[22:] service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode() return service_id def get_public_ipv6(): try: host = ('2a01:4f8:120:3201::3', 25519) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) s.settimeout(1) s.connect(host) ip = s.getsockname()[0] s.close() except: ip = None return ip def get_interface(): interface = '' if sys.platform == 'darwin' or sys.platform.startswith('freebsd'): #cmd = ['/usr/sbin/netstat', '-rn'] cmd = ['/sbin/route', '-n', 'get', 'default'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() stdout = stdout.decode('utf-8') interface = [[p.strip() for p in s.split(':', 1)] for s in stdout.strip().split('\n') if 'interface' in s] if interface: interface = '%%%s' % interface[0][1] else: interface = '' return interface def get_local_ipv4(): ip = None if sys.platform == 'darwin' or sys.platform.startswith('freebsd'): cmd = ['/sbin/route', '-n', 'get', 'default'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() stdout = stdout.decode('utf-8') interface = [[p.strip() for p in s.split(':', 1)] for s in stdout.strip().split('\n') if 'interface' in s] if interface: interface = interface[0][1] cmd = ['ifconfig', interface] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() stdout = stdout.decode('utf-8') ips = [l for l in stdout.split('\n') if 'inet ' in l] if ips: ip = ips[0].strip().split(' ')[1] elif sys.platform.startswith('linux'): cmd = ['ip', 'route', 'show'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() stdout = stdout.decode('utf-8') local = [l for l in stdout.split('\n') if 'default' in l] if local: dev = local[0].split(' ')[4] local_ip = [l for l in stdout.split('\n') if dev in l and not 'default' in l and 'src' in l] if local_ip: local_ip = [p for p in local_ip[0].split(' ')[1:] if '.' in p] if local_ip: ip = local_ip[0] if not ip: cmd = ['ip', 'addr', 'show'] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True) stdout, stderr = p.communicate() stdout = stdout.decode('utf-8') parts = stdout.split(' ') local_ip = [p for p in parts if dev in p] if local_ip: local_ip = re.compile('inet (\d+\.\d+\.\d+.\d+)').findall(local_ip[0]) if local_ip: ip = local_ip[0] if not ip: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 53)) return s.getsockname()[0] except: pass return ip def update_dict(root, data): for key in data: keys = [part.replace('\0', '.') for part in key.replace('\\.', '\0').split('.')] value = data[key] p = root while len(keys)>1: key = keys.pop(0) if isinstance(p, list): p = p[get_position_by_id(p, key)] else: if key not in p: p[key] = {} p = p[key] if value == None and keys[0] in p: del p[keys[0]] else: p[keys[0]] = value if hasattr(root, '_save'): root._save() def remove_empty_folders(prefix, keep_root=False): empty = [] for root, folders, files in os.walk(prefix): if len(files) == 1 and files[0] == '.DS_Store': os.unlink(os.path.join(root, files[0])) files = [] if not folders and not files: if root != prefix or not keep_root: empty.append(root) for folder in empty: remove_empty_tree(folder) def remove_empty_tree(leaf): while leaf: if not os.path.exists(leaf): leaf = os.path.dirname(leaf) elif os.path.isdir(leaf) and not os.listdir(leaf): logger.debug('rmdir %s', leaf) os.rmdir(leaf) else: break try: utc_0 = int(time.mktime(datetime(1970, 1, 1).timetuple())) except: utc_0 = int(time.mktime(time.gmtime()) - time.mktime(time.localtime())) def datetime2ts(dt): return int(time.mktime(dt.utctimetuple())) - utc_0 def ts2datetime(ts): return datetime.utcfromtimestamp(float(ts)) def run(*cmd): p = subprocess.Popen(cmd, close_fds=True) p.wait() return p.returncode def get(*cmd): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, error = p.communicate() return stdout.decode() def makefolder(path): dirname = os.path.dirname(path) if not os.path.exists(dirname): os.makedirs(dirname) def open_file(path=None): cmd = [] if sys.platform == 'darwin': cmd += ['open', path] elif sys.platform.startswith('linux'): cmd += ['xdg-open', path] elif sys.platform == 'win32': path = '\\'.join(path.split('/')) os.startfile(path) cmd = [] else: logger.debug('unsupported platform %s', sys.platform) if cmd: subprocess.Popen(cmd, close_fds=True) def open_folder(folder=None, path=None): cmd = [] if path and not folder: folder = os.path.dirname(path) if folder and not path: path = folder if sys.platform == 'darwin': if folder and not path: path = folder cmd += ['open', '-R', path] elif sys.platform.startswith('linux'): cmd += ['xdg-open', folder] elif sys.platform == 'win32': path = '\\'.join(path.split('/')) cmd = 'explorer.exe /select,"%s"' % path else: logger.debug('unsupported platform %s', sys.platform) if cmd: subprocess.Popen(cmd, close_fds=True) def can_connect_dns(host="8.8.8.8", port=53): """ host: 8.8.8.8 (google-public-dns-a.google.com) port: 53/tcp """ import socks import state try: sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM, 6) sock.settimeout(2) socks_port = state.tor.socks_port if state.tor else 9150 sock.set_proxy(socks.SOCKS5, "localhost", socks_port, True) sock.connect((host, port)) return True except: #logger.debug('failed to connect', exc_info=True) pass return False def _to_json(python_object): if isinstance(python_object, datetime): if python_object.year < 1900: tt = python_object.timetuple() return '%d-%02d-%02dT%02d:%02d%02dZ' % tuple(list(tt)[:6]) return python_object.strftime('%Y-%m-%dT%H:%M:%SZ') raise TypeError(u'%s %s is not JSON serializable' % (repr(python_object), type(python_object))) def get_ratio(data): try: img = Image.open(BytesIO(data)) return img.size[0]/img.size[1] except: return 1 def get_meta_hash(data): data = data.copy() if 'sharemetadata' in data: del data['sharemetadata'] for key in list(data): if not data[key]: del data[key] return hashlib.sha1(json.dumps(data, ensure_ascii=False, sort_keys=True).encode()).hexdigest() def update_static(): import settings import os import ox path = os.path.join(settings.static_path, 'js') files = sorted([ file for file in os.listdir(path) if not file.startswith('.') and not file.startswith('oml.') ]) ox.file.write_json(os.path.join(settings.static_path, 'json', 'js.json'), files, indent=4) ox.file.write_file( os.path.join(path, 'oml.min.js'), '\n'.join([ ox.js.minify(ox.file.read_file(os.path.join(path, file)).decode('utf-8')) for file in files ]) ) def check_pid(pid): try: os.kill(pid, 0) except: return False else: return True def check_pidfile(pid): try: with open(pid) as fd: pid = int(fd.read()) except: return False return check_pid(pid) def ctl(*args): import settings if sys.platform == 'win32': platform_win32 = os.path.normpath(os.path.join(settings.base_dir, '..', 'platform_win32')) python = os.path.join(platform_win32, 'pythonw.exe') cmd = [python, 'oml'] + list(args) startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = subprocess.SW_HIDE subprocess.Popen(cmd, cwd=settings.base_dir, start_new_session=True, startupinfo=startupinfo) else: subprocess.Popen([os.path.join(settings.base_dir, 'ctl')] + list(args), close_fds=True, start_new_session=True) def user_sort_key(u): return ox.sort_string(str(u.get('index', '')) + 'Z' + (u.get('name') or '')) def get_peer(peerid): import state import library if peerid not in state.peers: state.peers[peerid] = library.Peer(peerid) return state.peers[peerid] def send_debug(): import settings import tor_request import gzip import io url = 'http://rnogx24drkbnrxa3.onion/debug' headers = { 'User-Agent': settings.USER_AGENT, } debug_log = os.path.join(settings.data_path, 'debug.log') last_debug = settings.server.get('last_debug') old = last_debug is not None try: if os.path.exists(debug_log): data = [] with open(debug_log, 'r') as fd: for line in fd: t = line.split(':DEBUG')[0] if t.count('-') == 2: timestamp = t if old and timestamp > last_debug: old = False if not old: data.append(line) data = ''.join(data) if data: bytes_io = io.BytesIO() gzip_file = gzip.GzipFile(fileobj=bytes_io, mode='wb') gzip_file.write(data.encode()) gzip_file.close() result = bytes_io.getvalue() bytes_io.close() opener = tor_request.get_opener() opener.addheaders = list(zip(headers.keys(), headers.values())) r = opener.open(url, result) if r.status != 200: logger.debug('failed to send debug information') else: settings.server['last_debug'] = timestamp except: logger.debug('failed to send debug information')