diff --git a/oml/changelog.py b/oml/changelog.py index abc87b3..03900a5 100644 --- a/oml/changelog.py +++ b/oml/changelog.py @@ -77,10 +77,48 @@ class Changelog(db.Model): logger.debug('record change: %s', c.json()) @classmethod - def apply_changes(cls, user, changes): + def apply_changes(cls, user, changes, first=False): trigger = changes if trigger: - trigger_event('change', {}); + trigger_event('change', {}) + if first: + logger.debug('remove left over items') + items = set() + lists = {} + peers = set() + for change in changes: + if change[2][0] == 'additem': + items.add(change[2][1]) + if change[2][0] == 'addlist': + lists[change[2][1]] = set() + if change[2][0] == 'addlistitems': + if not change[2][1] in lists: + lists[change[2][1]] = set() + for i in change[2][2]: + lists[change[2][1]].add(i) + if change[2][0] == 'addpeer': + peers.add(change[2][1]) + for i in user.library.items: + if i.id not in items and user in i.users: + i.users.remove(user) + if i.users: + i.update() + else: + i.delete() + for name in lists: + qs = user.lists.filter_by(name=name) + if qs.count(): + l = qs[0] + remove = [] + for i in l.get_items(): + if i.id not in lists[name]: + remove.append(i.id) + l.items.remove(i.id) + for peer in user.models.Users.query: + if user.id in peer.info.get('users', {}) and peer.id not in peers: + del peer.info['users'][user.id] + peer.save() + for change in changes: if user.id in state.removepeer: del state.removepeer[user.id] @@ -105,10 +143,9 @@ class Changelog(db.Model): c.timestamp = timestamp c.user_id = user.id c.revision = revision - c.data = data - args = json.loads(data) - logger.debug('apply change from %s: %s(%s)', user.name, args[0], args[1:]) - if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]): + c.data = json.dumps(data) + logger.debug('apply change from %s: %s(%s)', user.name, data[0], data[1:]) + if getattr(c, 'action_' + data[0])(user, timestamp, *data[1:]): logger.debug('change applied') state.db.session.add(c) state.db.session.commit() @@ -126,26 +163,8 @@ class Changelog(db.Model): def json(self): timestamp = self.timestamp or datetime2ts(self.created) - return [self.revision, timestamp, self.data] + return [self.revision, timestamp, json.loads(self.data)] - @classmethod - def restore(cls, user_id, path=None): - from user.models import User - user = User.get_or_create(user_id) - if not path: - path = '/tmp/oml_changelog_%s.json' % user_id - with open(path, 'r') as fd: - for change in fd: - change = json.loads(change) - cls.apply_change(user, change, user_id == settings.USER_ID) - - @classmethod - def export(cls, user_id, path=None): - if not path: - path = '/tmp/oml_changelog_%s.json' % user_id - with open(path, 'w') as fd: - for c in cls.query.filter_by(user_id=user_id).order_by('revision'): - fd.write(json.dumps(c.json(), ensure_ascii=False) + '\n') def action_additem(self, user, timestamp, itemid, info): from item.models import Item @@ -435,17 +454,17 @@ class Changelog(db.Model): rv = data[0] ts = data[1] data = [op, id] + data[2:] - _changes.append([rv, ts, json.dumps(data)]) + _changes.append([rv, ts, data]) _changes.sort(key=lambda change: (change[0], change[1])) if orderlists: ids = [l.name for l in List.query.filter_by(user_id=user_id,type='static').order_by('index_') if l.name] if len(ids) > 1: - _changes.append([-1, timestamp, json.dumps(['orderlists', ids])]) + _changes.append([-1, timestamp, ['orderlists', ids]]) userinfo = state.user().json() if editusername: - _changes.append([-1, timestamp, json.dumps(['editusername', userinfo['username']])]) + _changes.append([-1, timestamp, ['editusername', userinfo['username']]]) if editcontact: - _changes.append([-1, timestamp, json.dumps(['editcontact', userinfo['contact']])]) + _changes.append([-1, timestamp, ['editcontact', userinfo['contact']]]) if _changes: r = revision for c in reversed(_changes): diff --git a/oml/directory.py b/oml/directory.py index 63ae456..8a5dea0 100644 --- a/oml/directory.py +++ b/oml/directory.py @@ -13,7 +13,6 @@ import settings logger = logging.getLogger(__name__) -base = settings.server['directory_service'] base = 'http://hpjats6xixrleoqg.onion:25519' def get(vk): diff --git a/oml/localnodes.py b/oml/localnodes.py index 36140f9..0f69df8 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -38,7 +38,7 @@ def can_connect(data): r = opener.open(url) version = r.headers.get('X-Node-Protocol', None) if version != settings.NODE_PROTOCOL: - logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) + logger.debug('version does not match local: %s remote %s (%s)', settings.NODE_PROTOCOL, version, data['id']) return False c = r.read() return True diff --git a/oml/meta/__init__.py b/oml/meta/__init__.py index cd7f28d..e7b91a8 100644 --- a/oml/meta/__init__.py +++ b/oml/meta/__init__.py @@ -53,11 +53,6 @@ def lookup_provider(arg): def lookup(key, value): if not isvalid_id(key, value): return {} - ''' - if not settings.server.get('local_lookup'): - import oml.metaremote - return oml.metaremote.lookup(key, value) - ''' if key == 'isbn': try: data = google.info(key, value) diff --git a/oml/nodes.py b/oml/nodes.py index 9a2b975..2b49ef0 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -33,7 +33,6 @@ ENCODING='base64' class Node(Thread): _running = True - _pulling = False host = None local = None _online = None @@ -49,10 +48,7 @@ class Node(Thread): Thread.__init__(self) self.daemon = True self.start() - self._pull = PeriodicCallback(self.pull, 60000) - self._pull.start() self.ping() - self.pull() def run(self): while self._running: @@ -63,12 +59,6 @@ class Node(Thread): self._send_response() elif action == 'ping': self.online = self.can_connect() - elif action == 'pull': - self._pulling = True - self.online = self.can_connect() - if self.online: - self.pullChanges() - self._pulling = False else: logger.debug('unknown action %s', action) @@ -77,9 +67,6 @@ class Node(Thread): self._q.put('') #return Thread.join(self) - def pull(self): - if state.online and not self._pulling: - self._q.put('pull') def ping(self): if state.online: @@ -262,8 +249,6 @@ class Node(Thread): }) def pullChanges(self): - if state.activity and state.activity.get('activity') == 'import': - return with db.session(): u = user.models.User.get_or_create(self.user_id) if not self.online or not u.peered: @@ -278,7 +263,7 @@ class Node(Thread): return False if not changes: return False - r = Changelog.apply_changes(u, changes) + r = Changelog.apply_changes(u, changes, first=from_revision == 0) return r def peering(self, action): @@ -416,6 +401,7 @@ class Node(Thread): class Nodes(Thread): _nodes = {} _local = None + _pulling = False def __init__(self): self._q = Queue() @@ -432,14 +418,21 @@ class Nodes(Thread): self._local = LocalNodes() self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000) self._cleanup.start() + self._pullcb = PeriodicCallback(self.pull, 60000) + self._pullcb.start() Thread.__init__(self) self.daemon = True self.start() + self.pull() def cleanup(self): if self._running and self._local: self._local.cleanup() + def pull(self): + if state.online and not self._pulling: + self.queue('pull') + def queue(self, *args): self._q.put(list(args)) @@ -480,6 +473,16 @@ class Nodes(Thread): if send_response: self._nodes[user_id].send_response() + def _pull(self): + if state.activity and state.activity.get('activity') == 'import': + return + self._pulling = True + for node in list(self._nodes.values()): + node.online = node.can_connect() + if node.online: + node.pullChanges() + self._pulling = False + def run(self): while self._running: args = self._q.get() @@ -488,6 +491,8 @@ class Nodes(Thread): self.cleanup() elif args[0] == 'add': self._add(*args[1:]) + elif args[0] == 'pull': + self._pull() else: self._call(*args) diff --git a/oml/settings.py b/oml/settings.py index ce3355a..033b9be 100644 --- a/oml/settings.py +++ b/oml/settings.py @@ -47,8 +47,6 @@ server_defaults = { 'node_address': '', 'extract_text': True, 'localnode_discovery': True, - 'directory_service': 'http://[2a01:4f8:120:3201::3]:25519', - 'meta_service': 'http://meta.openmedialibrary.com/api/', 'release_url': 'http://downloads.openmedialibrary.com/release.json', } @@ -94,11 +92,11 @@ if 'modules' in release and 'openmedialibrary' in release['modules']: else: MINOR_VERSION = 'git' -NODE_PROTOCOL="0.6" +NODE_PROTOCOL="0.7" VERSION="%s.%s" % (NODE_PROTOCOL, MINOR_VERSION) USER_AGENT = 'OpenMediaLibrary/%s' % VERSION DEBUG_HTTP = server.get('debug_http', False) -DB_VERSION = 7 +DB_VERSION = 8 diff --git a/oml/tasks.py b/oml/tasks.py index ca84b60..39ddbbc 100644 --- a/oml/tasks.py +++ b/oml/tasks.py @@ -29,7 +29,7 @@ class Tasks(Thread): if m: try: action, data = m - logger.debug('run task %s', action) + logger.debug('%s start', action) if action == 'ping': trigger_event('pong', data) elif action == 'import': @@ -48,7 +48,7 @@ class Tasks(Thread): sync_metadata(*data) else: trigger_event('error', {'error': 'unknown action'}) - logger.debug('finished task %s', action) + logger.debug('%s done', action) except: logger.debug('task failed', exc_info=1) self.q.task_done() diff --git a/oml/update.py b/oml/update.py index d10e5b5..2aea108 100644 --- a/oml/update.py +++ b/oml/update.py @@ -420,3 +420,31 @@ def migrate_7(): db.run_sql('DROP TABLE IF EXISTS scrape') db.run_sql('VACUUM') return 7 + +def migrate_8(): + for key in ('directory_service', 'meta_service', 'local_lookup', 'cert'): + if key in settings.server: + del settings.server[key] + list_cache = os.path.join(settings.data_path, 'list_cache.json') + if os.path.exists(list_cache): + os.unlink(list_cache) + with db.session() as session: + import item.models + for i in item.models.Item.query: + delta = set(i.meta)-set(i.meta_keys) + if delta: + for key in delta: + del i.meta[key] + session.add(i) + session.commit() + import changelog + import user.models + changelog.Changelog.query.delete() + u = user.models.User.get(settings.USER_ID) + u.rebuild_changelog() + for peer in user.models.User.query: + if peer.id != u.id: + if len(peer.id) != 16: + session.delete(peer) + session.commit() + return 8 diff --git a/oml/user/models.py b/oml/user/models.py index 7425593..8d9fe0e 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -7,6 +7,7 @@ import os import shutil import ox +from sqlalchemy.orm import load_only import sqlalchemy as sa from changelog import Changelog @@ -182,6 +183,32 @@ class User(db.Model): session.connection().execute(sql.format(oid=self.id, nid=service_id)) session.commit() + def rebuild_changelog(self): + Changelog.query.filter_by(user_id=self.id).delete() + for item in self.library.get_items().order_by('created'): + Changelog.record(self, 'additem', item.id, item.info) + Changelog.record(self, 'edititem', item.id, item.meta) + lists = [] + for l in List.query.filter_by(user_id=self.id, type='static').order_by('index_'): + if l.name: + lists.append(l.name) + Changelog.record(self, 'addlist', l.name) + items = [i.id for i in l.get_items().options(load_only('id'))] + if items: + Changelog.record(self, 'addlistitems', l.name, items) + if len(lists) > 1: + Changelog.record(self, 'orderlists', lists) + + for peer in User.query.filter_by(peered=True): + Changelog.record(self, 'addpeer', peer.id, self.nickname) + if peer.info.get('contact'): + Changelog.record(self, 'editpeer', peer.id, { + 'contact': peer.info.get('contact') + }) + + if settings.preferences.get('contact'): + Changelog.record(self, 'editcontact', settings.preferences.get('contact')) + list_items = sa.Table('listitem', db.metadata, sa.Column('list_id', sa.Integer(), sa.ForeignKey('list.id')), sa.Column('item_id', sa.String(32), sa.ForeignKey('item.id'))