diff --git a/oml/item/models.py b/oml/item/models.py index 1d5c34d..5a5d054 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -3,7 +3,6 @@ from datetime import datetime import base64 import hashlib -import json import os import re import shutil @@ -98,6 +97,13 @@ class Item(db.Model): def find(cls, data): return Parser(cls, user_items, Find, Sort).find(data) + @classmethod + def remove_many(cls, ids): + Find.query.filter(Find.item_id.in_(ids)).delete(synchronize_session=False) + Sort.query.filter(Sort.item_id.in_(ids)).delete(synchronize_session=False) + cls.query.filter(cls.id.in_(ids)).delete(synchronize_session=False) + state.db.session.expire_all() + def add_user(self, user): if not user in self.users: self.users.append(user) @@ -336,7 +342,7 @@ class Item(db.Model): f.move() def get_hash(self): - return utils.get_meta_hash(self.meta.copy()) + return utils.get_meta_hash(self.meta) def get_sorttitle(self): title = self.meta.get('sorttitle') @@ -346,7 +352,6 @@ class Item(db.Model): return title def sync_metadata(self): - from user.models import Metadata if self.meta.get('sharemetadata'): return peers = [u for u in self.users if u.id != settings.USER_ID] @@ -355,34 +360,44 @@ class Item(db.Model): first_peer = None # get first peer with sharemetadata set for u in peers: - m = Metadata.get(u.id, self.id) + peer = utils.get_peer(u.id) + if self.id in peer.library: + m = peer.library[self.id].get('meta') + else: + m = None if m: - if m.data.get('sharemetadata'): - sync_from = m + if m.get('sharemetadata'): + sync_from = u.id break if not first_peer: - first_peer = m + first_peer = u.id # of fall back to first peer that has this item # in case its not available locally if not sync_from and self.info.get('mediastate') != 'available' and first_peer: #logger.debug('syncing from first peer that has item %s', first_peer) sync_from = first_peer if sync_from: - if self.get_hash() != sync_from.data_hash: - logger.debug('update %s with metadata from %s', self, sync_from.user_id) + peer = utils.get_peer(sync_from) + data_hash = peer.get_metahash(self.id) + item = peer.library[self.id] + sync_meta = item['meta'] + sync_modified = item.get('modified') + if self.get_hash() != data_hash: + logger.debug('update %s with metadata from %s', self, sync_from) record = {} - for key in sync_from.data: - if key != 'sharemetadata' and self.meta.get(key) != sync_from.data[key]: - record[key] = self.meta[key] = sync_from.data[key] - for key in set(self.meta)-set(sync_from.data): + for key in sync_meta: + if key != 'sharemetadata' and self.meta.get(key) != sync_meta[key]: + record[key] = self.meta[key] = sync_meta[key] + for key in set(self.meta)-set(sync_meta): record[key] = self.meta[key] = [] if key in self.array_keys else '' - self.update(sync_from.modified) + self.update(sync_modified) self.save() user = state.user() if record and user in self.users: Changelog.record(user, 'edititem', self.id, record, _ts=self.modified) if 'cover' in record: - self.update_cover() + if state.tasks: + state.tasks.queue('getcover', self.id) def extract_preview(self): path = self.get_path() @@ -450,7 +465,7 @@ class Item(db.Model): def update_icons(self): if state.online: self.update_cover() - else: + elif state.tasks: state.tasks.queue('getcover', self.id) self.update_preview() diff --git a/oml/library.py b/oml/library.py new file mode 100644 index 0000000..3b6832f --- /dev/null +++ b/oml/library.py @@ -0,0 +1,343 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +import json +import os +import time + +import ox +from sqlitedict import SqliteDict + +import db +import settings +import state +import utils +from websocket import trigger_event + +import logging +logger = logging.getLogger(__name__) + +COMMIT_TIMEOUT = 20 + +def maybe_commit(t0): + if time.time() - t0 > COMMIT_TIMEOUT: + state.db.session.commit() + t0 = time.time() + return t0 + +class Peer(object): + def __init__(self, id): + base = os.path.join(settings.data_path, 'peers') + ox.makedirs(base) + self._dbpath = os.path.join(base, '%s.db' % id) + self._infopath = os.path.join(base, '%s.json' % id) + + self.id = id + self.library = SqliteDict(self._dbpath, tablename='library', autocommit=False) + + if os.path.exists(self._infopath): + with open(self._infopath) as f: + self.info = json.load(f) + else: + self.info = {} + if not 'peers' in self.info: + self.info['peers'] = {} + if not 'lists' in self.info: + self.info['lists'] = {} + + def apply_changes(self, changes): + r = True + for change in changes: + if state.shutdown: + r = False + break + if not self.apply_change(change): + logger.debug('FAIL %s', change) + r = False + break + self.library.commit() + self.sync_info() + self.sync_db() + trigger_event('change', {}) + return r + + def apply_change(self, change): + revision, timestamp, data = change + action = data[0] + args = data[1:] + #logger.debug('apply change: %s(%s)', action, args) + if action == 'additem': + itemid, info = args + item = self.library.get(itemid, {}) + item['info'] = info + item['modified'] = utils.ts2datetime(timestamp) + self.library[itemid] = item + # trigger additem + elif action == 'edititem': + itemid, meta = args + item = self.library.get(itemid, {}) + if not 'meta' in item: + item['meta'] = meta + else: + item['meta'].update(meta) + item['meta_hash'] = utils.get_meta_hash(item['meta']) + item['modified'] = utils.ts2datetime(timestamp) + self.library[itemid] = item + if state.tasks: + state.tasks.queue('syncmetadata', [itemid]) + state.tasks.queue('getpreview', itemid) + elif action == 'removeitem': + itemid = args[0] + if itemid in self.library: + del self.library[itemid] + elif action == 'addlist': + name = args[0] + if len(args) > 1: + query = args[1] + if not name in self.info['lists']: + self.info['lists'][name] = [] + elif action == 'editlist': + name, new = args + if name in self.info['lists']: + self.info['lists'][new] = self.info['lists'].pop(name) + else: + self.info['lists'][new] = [] + elif action == 'orderlists': + self.info['listorder'] = args[0] + elif action == 'removelist': + name = args[0] + if name in self.info['lists']: + del self.info['lists'][name] + elif action == 'addlistitems': + name, ids = args + if not name in self.info['lists']: + self.info['lists'][name] = [] + self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids)) + elif action == 'removelistitems': + name, ids = args + if name in self.info['lists']: + self.info['lists'][name] = list(set(self.info['lists'][name]) - set(ids)) + elif action == 'addpeer': + peerid, username = args + if len(peerid) == 16: + self.info['peers'][peerid] = {'username': username} + # fixme, just trigger peer update here + from user.models import User + peer = User.get_or_create(peerid) + if not 'username' in peer.info: + peer.info['username'] = username + peer.update_name() + peer.save() + elif action == 'editpeer': + peerid, data = args + if len(peerid) == 16: + if not peerid in self.info['peers']: + self.info['peers'][peerid] = {} + for key in ('username', 'contact'): + if key in data: + self.info['peers'][peerid][key] = data[key] + # fixme, just trigger peer update here + from user.models import User + peer = User.get_or_create(peerid) + update = False + if not peer.peered: + for key in ('username', 'contact'): + if key in data and peer.info.get(key) != data[key]: + peer.info[key] = data[key] + update = True + if update: + peer.save() + elif action == 'removepeer': + peerid = args[0] + if peerid in self.peers['peers']: + del self.info['peers'] + # trigger peer update + elif action == 'editusername': + self.info['username'] = args[0] + elif action == 'editcontact': + self.info['contact'] = args[0] + else: + logger.debug('UNKNOWN ACTION:', action) + self.info['revision'] = revision + return True + + def get_metahash(self, itemid): + item = self.library[itemid] + if 'meta_hash' not in item: + item['meta_hash'] = utils.get_meta_hash(item['meta']) + self.library[itemid] = item + return item['meta_hash'] + + def sync_info(self): + with open(self._infopath, 'w') as fd: + json.dump(self.info, fd, indent=4, sort_keys=True) + + def join(self): + #self.library.join() + self.library.close() + self.sync_info() + + def remove(self): + self.join() + os.unlink(self._dbpath) + os.unlink(self._infopath) + + def sync_db(self): + import item.models + import user.models + c_user_id = item.models.user_items.columns['user_id'] + c_item_id = item.models.user_items.columns['item_id'] + l_list_id = user.models.list_items.columns['list_id'] + l_item_id = user.models.list_items.columns['item_id'] + q = item.models.user_items.select().where(c_user_id.is_(self.id)) + current = set([r[1] for r in state.db.session.execute(q)]) + library = set(self.library.keys()) + remove = list(current - library) + logger.debug('remove %s', len(remove)) + u = user.models.User.get(self.id) + listid = u.library.id + lists = [l.id for l in u.lists.all()] + getpreview = [] + t0 = time.time() + if remove: + q = item.models.user_items.delete().where(c_user_id.is_(self.id)).where(c_item_id.in_(remove)) + state.db.session.execute(q) + q = user.models.list_items.delete().where(l_list_id.in_(lists)).where(l_item_id.in_(remove)) + state.db.session.execute(q) + add = list(library - current) + logger.debug('add %s', len(add)) + listitems = {} + q = user.models.list_items.select().where(l_list_id.in_(lists)) + for row in state.db.session.execute(q): + if not row['list_id'] in listitems: + listitems[row['list_id']] = set() + listitems[row['list_id']].add(row['item_id']) + t0 = maybe_commit(t0) + if add: + t0 = time.time() + q = item.models.user_items.select().where(c_user_id.is_(self.id)) + useritems = {r['item_id'] for r in state.db.session.execute(q)} + for itemid in add: + i = item.models.Item.get(itemid) + if not i: + i = item.models.Item(id=itemid) + m = self.library.get(itemid, {}) + i.info.update(m.get('info', {})) + i.meta = m.get('meta', {}) + state.db.session.add(i) + getpreview.append(itemid) + if itemid not in useritems: + q = item.models.user_items.insert({'item_id': itemid, 'user_id': self.id}) + state.db.session.execute(q) + if itemid not in listitems.get(listid, []): + q = user.models.list_items.insert({'item_id': itemid, 'list_id': listid}) + state.db.session.execute(q) + if state.shutdown: + break + t0 = maybe_commit(t0) + state.db.session.commit() + if remove: + q = item.models.user_items.select() + user_items = {i['item_id'] for i in state.db.session.execute(q)} + removed_items = set(remove)-user_items + if removed_items: + item.models.Item.remove_many(removed_items) + state.db.session.commit() + if state.shutdown: + return + for itemid in getpreview: + state.tasks.queue('getpreview', itemid) + update_items = remove + add + + current_lists = set(l.name for l in u.lists.all() if l.name) + add = list(set(self.info['lists']) - current_lists) + remove = list(current_lists - set(self.info['lists'])) + t0 = time.time() + for l in u.lists.all(): + if l.name: + logger.debug('update list %s', l.name) + if l.name in remove: + l.remove(commit=False) + else: + if l.id in listitems: + ladd = list(set(self.info['lists'][l.name]) - set(listitems[l.id])) + lremove = list(set(listitems[l.id]) - set(self.info['lists'][l.name])) + if ladd: + l.add_items(ladd, commit=False) + update_items = list(set(update_items) - set(ladd)) + if lremove: + l.remove_items(lremove, commit=False) + else: + l.add_items(self.info['lists'][l.name], commit=False) + update_items = list(set(update_items) - set(self.info['lists'][l.name])) + if state.shutdown: + break + t0 = maybe_commit(t0) + state.db.session.commit() + if state.shutdown: + return + if add: + for name in add: + logger.debug('add list %s', name) + l = user.models.List.get_or_create(self.id, name) + l.add_items(self.info['lists'][name], commit=False) + trigger_event('addlist', {'id': l.public_id, 'user': self.id}) + update_items = list(set(update_items) - set(self.info['lists'][name])) + if state.shutdown: + break + logger.debug('update %s', len(update_items)) + t0 = time.time() + if update_items: + for i in item.models.Item.query.filter(item.models.Item.id.in_(update_items)): + i.update(commit=False) + t0 = maybe_commit(t0) + if state.shutdown: + break + state.db.session.commit() + +def sync_db(): + from sqlalchemy.orm import load_only + import item.models + with db.session(): + sort_ids = {i.item_id for i in item.models.Sort.query.options(load_only('item_id'))} + if sort_ids: + t0 = time.time() + commit = False + for i in item.models.Item.query.filter(item.models.Item.id.notin_(sort_ids)): + i.update(commit=False) + if i.info['mediastate'] == 'unavailable' and state.tasks: + state.tasks.queue('getpreview', i.id) + commit = True + #logger.debug('sync:%s', i) + t0 = maybe_commit(t0) + if state.shutdown: + break + if commit: + state.db.session.commit() + cleanup_lists() + +def cleanup_lists(): + from sqlalchemy.orm import load_only + import item.models + import user.models + with db.session(): + for l in user.models.List.query.all(): + if not l.user.peered and not l.user.id == settings.USER_ID: + l.remove() + + peers = [u.id for u in user.models.User.query.filter_by(peered=True)] + [settings.USER_ID] + q = item.models.user_items.delete().where(item.models.user_items.columns['user_id'].notin_(peers)) + state.db.session.execute(q) + + lists = [l.id for l in user.models.List.query.all()] + q = user.models.list_items.delete().where(user.models.list_items.columns['list_id'].notin_(lists)) + state.db.session.execute(q) + + state.db.session.commit() + + q = item.models.user_items.select() + user_items = {i['item_id'] for i in state.db.session.execute(q)} + ids = {i.id for i in item.models.Item.query.options(load_only('id'))} + remove = ids - user_items + if remove: + item.models.Item.remove_many(remove) + state.db.session.commit() diff --git a/oml/node/nodeapi.py b/oml/node/nodeapi.py index db11b90..3d13fdc 100644 --- a/oml/node/nodeapi.py +++ b/oml/node/nodeapi.py @@ -49,17 +49,18 @@ def api_requestPeering(user_id, username, message): def api_acceptPeering(user_id, username, message): user = User.get(user_id) - logger.debug('incoming acceptPeering event: pending: %s', user.pending) - if user and user.pending == 'sent': - user.info['username'] = username - user.info['message'] = message - user.update_name() - user.update_peering(True, username) - state.nodes.queue('add', user.id, True) - trigger_event('peering.accept', user.json()) - return True - elif user and user.peered: - return True + if user: + logger.debug('incoming acceptPeering event: pending: %s', user.pending) + if user.pending == 'sent': + user.info['username'] = username + user.info['message'] = message + user.update_name() + user.update_peering(True, username) + state.nodes.queue('add', user.id, True) + trigger_event('peering.accept', user.json()) + return True + elif user.peered: + return True return False def api_rejectPeering(user_id, message): diff --git a/oml/nodes.py b/oml/nodes.py index 3474e0b..f7a575f 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -22,9 +22,10 @@ from changelog import Changelog from websocket import trigger_event from localnodes import LocalNodes from tor_request import get_opener -from utils import user_sort_key +from utils import user_sort_key, get_peer import state import db +import library import logging logger = logging.getLogger(__name__) @@ -247,24 +248,21 @@ class Node(Thread): def pullChanges(self): with db.session(): u = user.models.User.get_or_create(self.user_id) - if not self.online or not u.peered: + if not u or not self.online or not u.peered: return True - last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first() - from_revision = last.revision + 1 if last else 0 - try: - changes = self.request('pullChanges', from_revision) - except: - self.online = False - logger.debug('%s went offline', u.name) - return False - if not changes: - return False - try: - r = Changelog.apply_changes(u, changes, first=from_revision == 0) - except: - logger.debug('apply_changes failed', exc_info=True) - r = False - return r + peer = get_peer(self.user_id) + from_revision = peer.info.get('revision', -1) + 1 + try: + changes = self.request('pullChanges', from_revision) + except: + self.online = False + logger.debug('%s went offline', u.name, ext_info=True) + return False + if not changes: + return False + #with open('/tmp/changelog_%s_%s.json' % (self.user_id, from_revision), 'w') as f: + # json.dump(changes, f, ensure_ascii=False, indent=4) + return peer.apply_changes(changes) def peering(self, action): with db.session(): @@ -411,13 +409,14 @@ class Nodes(Thread): del u.info['local'] u.save() self.queue('add', u.id) + state.peers[u.id] = library.Peer(u.id) for u in user.models.User.query.filter_by(queued=True): logger.debug('adding queued node... %s', u.id) self.queue('add', u.id, True) self._local = LocalNodes() self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000) self._cleanup.start() - self._pullcb = PeriodicCallback(self.pull, 60000) + self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval']) self._pullcb.start() Thread.__init__(self) self.daemon = True @@ -475,6 +474,7 @@ class Nodes(Thread): if state.activity and state.activity.get('activity') == 'import': return self._pulling = True + library.sync_db() users = [] with db.session(): from user.models import User @@ -493,6 +493,7 @@ class Nodes(Thread): self._pulling = False def run(self): + self.queue('pull') while not state.shutdown: args = self._q.get() if args: @@ -519,7 +520,6 @@ def publish_node(): state.check_nodes.start() state._online = PeriodicCallback(update_online, 60000) state._online.start() - state.nodes.pull() def update_online(): online = state.tor and state.tor.is_online() diff --git a/oml/server.py b/oml/server.py index 0bb7421..7c274cd 100644 --- a/oml/server.py +++ b/oml/server.py @@ -74,11 +74,16 @@ def shutdown(): if state.tasks: logger.debug('shutdown tasks') state.tasks.join() + if state.update: + logger.debug('shutdown updates') + state.update.join() if state.node: state.node.stop() if state.tor: logger.debug('shutdown tor') state.tor.shutdown() + for peer in state.peers: + state.peers[peer].join() if state.PID and os.path.exists(state.PID): logger.debug('remove %s', state.PID) os.unlink(state.PID) @@ -163,7 +168,7 @@ def run(): state.tasks = tasks.Tasks() state.main.add_callback(start_node) else: - state.tasks = update.Update() + state.update = update.Update() if ':' in settings.server['address']: host = '[%s]' % settings.server['address'] diff --git a/oml/settings.py b/oml/settings.py index 80a2e16..eed31f5 100644 --- a/oml/settings.py +++ b/oml/settings.py @@ -48,6 +48,7 @@ server_defaults = { 'extract_text': True, 'localnode_discovery': True, 'release_url': 'http://downloads.openmedialibrary.com/release.json', + 'pull_interval': 60000 } for key in server_defaults: @@ -99,4 +100,4 @@ USER_AGENT = 'OpenMediaLibrary/%s' % VERSION DEBUG_HTTP = server.get('debug_http', False) -DB_VERSION = 10 +DB_VERSION = 11 diff --git a/oml/state.py b/oml/state.py index 900fc99..2f296e6 100644 --- a/oml/state.py +++ b/oml/state.py @@ -10,6 +10,7 @@ tor = False update = False shutdown = False websockets = [] +peers = {} activity = {} removepeer = {} diff --git a/oml/update.py b/oml/update.py index e036241..25318ea 100644 --- a/oml/update.py +++ b/oml/update.py @@ -342,6 +342,8 @@ class Update(Thread): db_version = migrate_8() if db_version < 10: db_version = migrate_10() + if db_version < 11: + db_version = migrate_11() settings.server['db_version'] = settings.DB_VERSION def run(self): @@ -513,3 +515,27 @@ def migrate_10(): Find.query.filter_by(key=key, value=value).update({'sortvalue': updates[key][value]}) session.commit() return 10 + +def migrate_11(): + with db.session() as session: + from user.models import User, Metadata + from changelog import Changelog + import utils + for u in User.query.filter_by(peered=True): + peer = utils.get_peer(u.id) + last = Changelog.query.filter_by(user_id=u.id).order_by('-revision').first() + if last: + peer.info['revision'] = last.revision + for m in Metadata.query.filter_by(user_id=u.id): + peer.library[m.item_id] = { + 'meta': dict(m.data), + 'meta_hash': m.data_hash, + 'modified': m.modified, + } + peer.library.commit() + peer.sync_info() + peer.sync_db() + Changelog.query.filter_by(user_id=u.id).delete() + Metadata.query.filter_by(user_id=u.id).delete() + session.commit() + return 11 diff --git a/oml/user/models.py b/oml/user/models.py index f7be0f4..89768c2 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -93,6 +93,11 @@ class User(db.Model): if self.id == settings.USER_ID: j['username'] = settings.preferences['username'] j['contact'] = settings.preferences['contact'] + elif self.id in state.peers: + peer = state.peers[self.id] + for key in ('username', 'contact'): + if key in peer.info: + j[key] = peer.info[key] if keys: for k in set(j) - set(keys): del j[k] @@ -161,6 +166,9 @@ class User(db.Model): Changelog.query.filter_by(user_id=self.id).delete() Metadata.query.filter_by(user_id=self.id).delete() self.save() + if self.id in state.peers: + state.peers[self.id].remove() + del state.peers[self.id] def update_name(self): if self.id == settings.USER_ID: @@ -297,7 +305,7 @@ class List(db.Model): if update_conditions(l._query.get('conditions', [])): l.save() - def add_items(self, items): + def add_items(self, items, commit=True): from item.models import Item for item_id in items: i = Item.get(item_id) @@ -305,9 +313,10 @@ class List(db.Model): self.items.append(i) if self.user_id == settings.USER_ID: i.queue_download() - i.update() + i.update(commit=commit) state.db.session.add(self) - state.db.session.commit() + if commit: + state.db.session.commit() if self.user_id == settings.USER_ID and self.name != '': Changelog.record(self.user, 'addlistitems', self.name, items) @@ -318,28 +327,30 @@ class List(db.Model): else: return self.user.items.join(Item.lists, aliased=True).filter(List.id == self.id) - def remove_items(self, items): + def remove_items(self, items, commit=True): from item.models import Item for item_id in items: i = Item.get(item_id) if i: if i in self.items: self.items.remove(i) - i.update() + i.update(commit=commit) state.db.session.add(self) - state.db.session.commit() + if commit: + state.db.session.commit() if self.user_id == settings.USER_ID and self.name != '': Changelog.record(self.user, 'removelistitems', self.name, items) - def remove(self): + def remove(self, commit=True): if not self._query: - for i in self.items: - self.items.remove(i) + q = list_items.delete().where(list_items.columns['list_id'].is_(self.id)) + state.db.session.execute(q) if not self._query: if self.user_id == settings.USER_ID and self.name != '': Changelog.record(self.user, 'removelist', self.name) state.db.session.delete(self) - state.db.session.commit() + if commit: + state.db.session.commit() @property def public_id(self): @@ -445,7 +456,7 @@ class Metadata(db.Model): return m def get_hash(self): - return utils.get_meta_hash(self.data.copy()) + return utils.get_meta_hash(self.data) def save(self, commit=True, modified=None): if modified is None: diff --git a/oml/utils.py b/oml/utils.py index d4d134d..1f37d9a 100644 --- a/oml/utils.py +++ b/oml/utils.py @@ -409,6 +409,7 @@ def get_ratio(data): def get_meta_hash(data): + data = data.copy() if 'sharemetadata' in data: del data['sharemetadata'] for key in list(data): @@ -469,3 +470,10 @@ def ctl(*args): def user_sort_key(u): return ox.sort_string(str(u.get('index', '')) + 'Z' + (u.get('name') or '')) + +def get_peer(peerid): + import state + import library + if peerid not in state.peers: + state.peers[peerid] = library.Peer(peerid) + return state.peers[peerid] diff --git a/requirements-shared.txt b/requirements-shared.txt index a8fe629..8686903 100644 --- a/requirements-shared.txt +++ b/requirements-shared.txt @@ -8,3 +8,4 @@ python-stdnum==0.9 PyPDF2==1.23 pysocks stem +sqlitedict==1.4.0