openmedialibrary/oml/meta/openlibrary.py

203 lines
6.4 KiB
Python

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from datetime import datetime
from urllib.parse import urlencode
import json
from ox.cache import read_url
from .dewey import get_classification
from .marc_countries import COUNTRIES
from .utils import normalize_isbn
import logging
logger = logging.getLogger(__name__)
KEYS = {
'authors': 'author',
'covers': 'cover',
'dewey_decimal_class': 'classification',
'isbn_10': 'isbn',
'isbn_13': 'isbn',
'lccn': 'lccn',
'number_of_pages': 'pages',
'languages': 'language',
'oclc_numbers': 'oclc',
'publish_country': 'country',
'publish_date': 'date',
'publishers': 'publisher',
'publish_places': 'place',
'series': 'series',
'title': 'title',
}
def find(query):
query = query.strip()
logger.debug('find %s', query)
r = api.search(query)
results = []
ids = [b for b in r.get('result', []) if b.startswith('/books')]
books = api.get_many(ids).get('result', [])
for olid, value in books.items():
olid = olid.split('/')[-1]
book = format(value)
book['olid'] = [olid]
book['primaryid'] = ['olid', olid]
results.append(book)
return results
def get_ids(key, value):
ids = []
if key == 'olid':
data = lookup(value)
for id in ('isbn', 'lccn', 'oclc'):
if id in data:
for v in data[id]:
if (id, v) not in ids:
ids.append((id, v))
elif key in ('isbn', 'oclc', 'lccn'):
logger.debug('get_ids %s %s', key, value)
if key == 'isbn':
key = 'isbn_%s'%len(value)
r = api.things({'type': '/type/edition', key: value})
for b in r.get('result', []):
if b.startswith('/books'):
olid = b.split('/')[-1]
for kv in [('olid', olid)] + get_ids('olid', olid):
if kv not in ids:
ids.append(kv)
if ids:
logger.debug('get_ids %s %s => %s', key, value, ids)
return ids
def lookup(id, return_all=False):
logger.debug('lookup %s', id)
info = api.get('/books/' + id).get('result', {})
#url = 'https://openlibrary.org/books/%s.json' % id
#info = json.loads(read_url(url).decode('utf-8'))
data = format(info, return_all)
if 'olid' not in data:
data['olid'] = []
if id not in data['olid']:
data['olid'] = [id]
logger.debug('lookup %s => %s', id, list(data.keys()))
return data
def get_type(obj):
type_ = obj.get('type')
if isinstance(type_, dict):
type_ = type_['key']
return type_
def parse_date(s):
#"January 1, 1998"
for pattern, fmt in (('%B %d, %Y', '%Y-%m-%d'), ('%B %Y', '%Y-%m')):
try:
d = datetime.strptime(s, pattern)
s = d.strftime(fmt)
return s
except:
pass
return s
def format(info, return_all=False):
data = {}
if 'works' in info:
work = api.get(info['works'][0]['key'])['result']
else:
work = None
for key in KEYS:
if key in info:
value = info[key]
if key == 'authors':
if work:
value = resolve_names([r['author']
for r in work.get('authors', []) if get_type(r) == '/type/author_role'])
else:
value = resolve_names(value)
elif key == 'publish_country':
value = value.strip()
value = COUNTRIES.get(value, value)
elif key == 'covers':
value = 'https://covers.openlibrary.org/b/id/%s.jpg' % value[0]
elif key == 'languages':
value = resolve_names(value)
elif key in ('isbn_10', 'isbn_13'):
if not isinstance(value, list):
value = [value]
value = list(map(normalize_isbn, value))
if KEYS[key] in data:
value = data[KEYS[key]] + value
elif isinstance(value, list) and key not in ('publish_places', 'lccn', 'oclc_numbers'):
value = value[0]
if key == 'publish_date':
value = parse_date(value)
data[KEYS[key]] = value
if 'subtitle' in info:
data['title'] += ' ' + info['subtitle']
if 'classification' in data:
value = data['classification']
if isinstance(value, list):
value = value[0]
data['classification'] = get_classification(value.split('/')[0])
return data
def resolve_names(objects, key='name'):
r = []
data = api.get_many([k['key'] for k in objects]).get('result', {})
for k, value in data.items():
if 'location' in value and value.get('type', {}).get('key') == '/type/redirect':
value = api.get(value['location']).get('result', {})
r.append(value[key])
return r
class API(object):
base = 'https://openlibrary.org/api'
def _request(self, action, data, timeout=None):
for key in data:
if not isinstance(data[key], str):
data[key] = json.dumps(data[key])
url = self.base + '/' + action + '?' + urlencode(data)
if timeout is None:
r = read_url(url).decode('utf-8')
if '504 Gateway Time-out' in r:
r = read_url(url, timeout=-1).decode('utf-8')
result = json.loads(r)
else:
r = read_url(url, timeout).decode('utf-8')
if '504 Gateway Time-out' in r:
r = read_url(url, timeout=-1).decode('utf-8')
result = json.loads(r)
if 'status' in result and result['status'] == 'error' or 'error' in result:
logger.info('FAILED %s %s', action, data)
logger.info('URL %s', url)
return result
def get(self, key):
data = self._request('get', {'key': key})
return data
def get_many(self, keys):
data = self._request('get_many', {'keys': keys})
return data
def search(self, query):
if isinstance(query, str):
query = {
'query': query
}
data = self._request('search', {'q': query})
if 'status' in data and data['status'] == 'error':
logger.info('FAILED %s', query)
return data
def things(self, query):
data = self._request('things', {'query': query})
return data
api = API()