2014-05-04 19:26:43 +02:00
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
2014-09-03 00:32:44 +02:00
2014-05-04 19:26:43 +02:00
2014-05-19 01:24:04 +02:00
import os
2014-05-22 16:20:40 +02:00
import sys
2014-09-01 12:38:14 +02:00
from PIL import Image
2014-09-03 00:32:44 +02:00
from io import StringIO, BytesIO
2014-05-04 19:26:43 +02:00
import re
import stdnum.isbn
2014-05-18 00:18:32 +02:00
import socket
2014-09-03 00:32:44 +02:00
import io
2014-05-19 01:24:04 +02:00
import gzip
2014-05-21 02:02:21 +02:00
import time
2015-11-26 01:26:10 +01:00
import hashlib
2014-05-21 02:02:21 +02:00
from datetime import datetime
2014-05-22 16:20:40 +02:00
import subprocess
2015-11-26 01:26:10 +01:00
import base64
2014-05-04 19:26:43 +02:00
import ox
2014-05-18 00:18:32 +02:00
import ed25519
2015-11-26 01:26:10 +01:00
from OpenSSL.crypto import (
load_privatekey, load_certificate,
dump_privatekey, dump_certificate,
X509, X509Extension
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
2014-05-04 19:26:43 +02:00
2014-05-16 10:06:11 +02:00
from meta.utils import normalize_isbn, find_isbns
2014-05-14 11:57:11 +02:00
2014-05-19 01:24:04 +02:00
import logging
2015-11-29 15:56:38 +01:00
logger = logging.getLogger(__name__)
2014-05-18 00:18:32 +02:00
2014-05-24 12:50:27 +02:00
def cleanup_id(key, value):
if key == 'isbn':
value = normalize_isbn(value)
if key in ('lccn', 'olid', 'oclc'):
value = ''.join([v for v in value if v!='-'])
return value
2014-05-04 19:26:43 +02:00
def valid_olid(id):
return id.startswith('OL') and id.endswith('M')
def get_positions(ids, pos):
>>> get_positions([1,2,3,4], [2,4])
{2: 1, 4: 3}
positions = {}
for i in pos:
positions[i] = ids.index(i)
return positions
def get_by_key(objects, key, value):
2014-09-03 00:32:44 +02:00
obj = [o for o in objects if o.get(key) == value]
2014-05-04 19:26:43 +02:00
return obj and obj[0] or None
def get_by_id(objects, id):
return get_by_key(objects, 'id', id)
def resize_image(data, width=None, size=None):
2014-09-03 00:32:44 +02:00
if isinstance(data, bytes):
data = BytesIO(data)
data = StringIO(data)
source = Image.open(data)
2015-11-19 16:13:13 +01:00
if source.mode not in ('1', 'CMYK', 'L', 'RGB', 'RGBA', 'RGBX', 'YCbCr'):
2014-05-19 23:15:37 +02:00
source = source.convert('RGB')
2014-05-04 19:26:43 +02:00
source_width = source.size[0]
source_height = source.size[1]
if size:
if source_width > source_height:
width = size
height = int(width / (float(source_width) / source_height))
height = height - height % 2
height = size
width = int(height * (float(source_width) / source_height))
width = width - width % 2
height = int(width / (float(source_width) / source_height))
height = height - height % 2
width = max(width, 1)
height = max(height, 1)
if width < source_width:
resize_method = Image.ANTIALIAS
resize_method = Image.BICUBIC
output = source.resize((width, height), resize_method)
2014-09-03 00:32:44 +02:00
o = BytesIO()
2014-05-04 19:26:43 +02:00
output.save(o, format='jpeg')
data = o.getvalue()
return data
def sort_title(title):
2014-09-03 00:32:44 +02:00
title = title.replace('Æ', 'Ae')
2014-05-04 19:26:43 +02:00
if isinstance(title, str):
2014-09-03 00:32:44 +02:00
title = str(title)
2014-05-04 19:26:43 +02:00
title = ox.sort_string(title)
2014-09-03 00:32:44 +02:00
title = re.sub('[\'!¿¡,\.;\-"\:\*\[\]]', '', title)
2014-05-04 19:26:43 +02:00
return title.strip()
def get_position_by_id(list, key):
for i in range(0, len(list)):
if list[i]['id'] == key:
return i
return -1
2015-12-25 19:40:49 +05:30
def get_language(lang):
return ox.iso.codeToLang(lang.split('-')[0]) or lang
2014-05-18 00:18:32 +02:00
def valid(key, value, sig):
validate that value was signed by key
2014-09-09 12:08:04 +02:00
if isinstance(sig, str):
sig = sig.encode()
if isinstance(value, str):
value = value.encode()
if isinstance(key, str):
key = key.encode()
vk = ed25519.VerifyingKey(key, encoding=ENCODING)
2014-05-18 00:18:32 +02:00
2014-09-09 12:08:04 +02:00
vk.verify(sig, value, encoding=ENCODING)
2014-05-18 00:18:32 +02:00
#except ed25519.BadSignatureError:
return False
return True
2015-11-26 01:26:10 +01:00
def get_user_id(private_key, cert_path):
if os.path.exists(private_key):
with open(private_key) as fd:
key = load_privatekey(FILETYPE_PEM, fd.read())
if key.bits() != 1024:
user_id = get_service_id(private_key)
if not os.path.exists(private_key):
if os.path.exists(cert_path):
folder = os.path.dirname(private_key)
if not os.path.exists(folder):
os.chmod(folder, 0o700)
key = PKey()
key.generate_key(TYPE_RSA, 1024)
with open(private_key, 'wb') as fd:
os.chmod(private_key, 0o600)
fd.write(dump_privatekey(FILETYPE_PEM, key))
os.chmod(private_key, 0o400)
user_id = get_service_id(private_key)
if not os.path.exists(cert_path):
ca = X509()
ca.get_subject().CN = user_id
ca.gmtime_adj_notAfter(24 * 60 * 60)
X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
X509Extension(b"nsCertType", True, b"sslCA"),
X509Extension(b"extendedKeyUsage", True,
X509Extension(b"keyUsage", False, b"keyCertSign, cRLSign"),
X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=ca),
ca.sign(key, "sha256")
with open(cert_path, 'wb') as fd:
fd.write(dump_certificate(FILETYPE_PEM, ca))
return user_id
def get_service_id(private_key_file=None, cert=None):
service_id is the first half of the sha1 of the rsa public key encoded in base32
if private_key_file:
with open(private_key_file, 'rb') as fd:
private_key = fd.read()
public_key = RSA.importKey(private_key).publickey().exportKey('DER')[22:]
# compute sha1 of public key and encode first half in base32
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
# compute public key from priate key and export in DER format
# ignoring the SPKI header(22 bytes)
key = load_privatekey(FILETYPE_PEM, private_key)
cert = X509()
public_key = dump_privatekey(FILETYPE_ASN1, cert.get_pubkey())[22:]
# compute sha1 of public key and encode first half in base32
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
elif cert:
# compute sha1 of public key and encode first half in base32
key = load_certificate(FILETYPE_ASN1, cert).get_pubkey()
pub_der = DerSequence()
pub_der.decode(dump_privatekey(FILETYPE_ASN1, key))
public_key = RSA.construct((pub_der._seq[1], pub_der._seq[2])).exportKey('DER')[22:]
service_id = base64.b32encode(hashlib.sha1(public_key).digest()[:10]).lower().decode()
return service_id
2014-05-18 00:18:32 +02:00
def get_public_ipv6():
2014-05-19 20:12:02 +02:00
host = ('2a01:4f8:120:3201::3', 25519)
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
ip = s.getsockname()[0]
ip = None
2014-05-18 00:18:32 +02:00
return ip
2014-05-22 16:20:40 +02:00
def get_interface():
interface = ''
2014-08-25 19:21:34 +02:00
if sys.platform == 'darwin' or sys.platform.startswith('freebsd'):
2014-05-22 16:20:40 +02:00
#cmd = ['/usr/sbin/netstat', '-rn']
cmd = ['/sbin/route', '-n', 'get', 'default']
2014-08-22 18:49:11 +02:00
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True)
2014-05-22 16:20:40 +02:00
stdout, stderr = p.communicate()
2015-01-22 16:07:51 +05:30
stdout = stdout.decode('utf-8')
interface = [[p.strip() for p in s.split(':', 1)]
for s in stdout.strip().split('\n') if 'interface' in s]
2014-05-22 16:20:40 +02:00
if interface:
interface = '%%%s' % interface[0][1]
interface = ''
return interface
def get_local_ipv4():
2014-05-23 22:10:02 +02:00
ip = None
2014-08-25 19:21:34 +02:00
if sys.platform == 'darwin' or sys.platform.startswith('freebsd'):
2014-05-23 13:22:30 +02:00
cmd = ['/sbin/route', '-n', 'get', 'default']
2014-08-22 18:49:11 +02:00
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True)
2014-05-23 13:22:30 +02:00
stdout, stderr = p.communicate()
2014-09-03 00:32:44 +02:00
stdout = stdout.decode('utf-8')
2014-05-23 13:22:30 +02:00
interface = [[p.strip() for p in s.split(':', 1)]
for s in stdout.strip().split('\n') if 'interface' in s]
if interface:
interface = interface[0][1]
cmd = ['ifconfig', interface]
2014-08-22 18:49:11 +02:00
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True)
2014-05-22 16:40:18 +02:00
stdout, stderr = p.communicate()
2014-09-03 00:32:44 +02:00
stdout = stdout.decode('utf-8')
2014-05-23 13:22:30 +02:00
ips = [l for l in stdout.split('\n') if 'inet ' in l]
if ips:
ip = ips[0].strip().split(' ')[1]
cmd = ['ip', 'route', 'show']
2014-08-22 18:49:11 +02:00
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=True)
2014-05-23 13:22:30 +02:00
stdout, stderr = p.communicate()
2014-09-03 00:32:44 +02:00
stdout = stdout.decode('utf-8')
2014-05-23 13:22:30 +02:00
local = [l for l in stdout.split('\n') if 'default' in l]
if local:
dev = local[0].split(' ')[4]
local_ip = [l for l in stdout.split('\n')
2014-05-25 14:40:57 +02:00
if dev in l and not 'default' in l and 'src' in l]
2014-05-23 13:22:30 +02:00
ip = [p for p in local_ip[0].split(' ')[1:] if '.' in p][0]
2014-05-22 16:20:40 +02:00
return ip
2014-05-19 01:24:04 +02:00
def update_dict(root, data):
for key in data:
2016-01-05 20:41:59 +05:30
keys = [part.replace('\0', '.') for part in key.replace('\\.', '\0').split('.')]
2014-05-19 01:24:04 +02:00
value = data[key]
p = root
while len(keys)>1:
key = keys.pop(0)
if isinstance(p, list):
p = p[get_position_by_id(p, key)]
if key not in p:
p[key] = {}
p = p[key]
if value == None and keys[0] in p:
del p[keys[0]]
p[keys[0]] = value
2016-01-05 20:41:59 +05:30
if hasattr(root, '_save'):
2014-05-19 01:24:04 +02:00
def remove_empty_folders(prefix):
empty = []
for root, folders, files in os.walk(prefix):
if not folders and not files:
for folder in empty:
def remove_empty_tree(leaf):
while leaf:
if not os.path.exists(leaf):
leaf = os.path.dirname(leaf)
elif os.path.isdir(leaf) and not os.listdir(leaf):
logger.debug('rmdir %s', leaf)
2014-05-21 02:02:21 +02:00
2014-09-03 01:09:42 +02:00
utc_0 = int(time.mktime(datetime(1970, 1, 1).timetuple()))
2014-05-21 02:02:21 +02:00
def datetime2ts(dt):
return int(time.mktime(dt.utctimetuple())) - utc_0
def ts2datetime(ts):
return datetime.utcfromtimestamp(float(ts))
2015-03-31 20:24:14 +02:00
def run(*cmd):
p = subprocess.Popen(cmd, close_fds=True)
return p.returncode
def get(*cmd):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
stdout, error = p.communicate()
return stdout.decode()
def makefolder(path):
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
2015-11-30 17:50:03 +01:00
2015-11-30 18:07:07 +01:00
def open_folder(folder=None, path=None):
cmd = []
if path and not folder:
folder = os.path.dirname(path)
if folder and not path:
path = folder
if sys.platform == 'darwin':
if folder and not path:
path = folder
cmd += ['open', '-R', path]
elif sys.platform.startswith('linux'):
cmd += ['xdg-open', folder]
logger.debug('unsupported platform %s', sys.platform)
subprocess.Popen(cmd, close_fds=True)
2015-12-02 22:05:23 +01:00
def can_connect_dns(host="", port=53):
host: (google-public-dns-a.google.com)
port: 53/tcp
2015-12-24 17:58:11 +05:30
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
2015-12-02 22:05:23 +01:00
return True
return False