From cf6d6acc5fb6818fbc37847b154df65af875840b Mon Sep 17 00:00:00 2001 From: j Date: Tue, 12 Jan 2016 13:02:39 +0530 Subject: [PATCH] avoid stale db sessoins in nodes thread --- oml/changelog.py | 2 + oml/item/models.py | 7 ++- oml/nodes.py | 135 +++++++++++++++++++-------------------------- 3 files changed, 64 insertions(+), 80 deletions(-) diff --git a/oml/changelog.py b/oml/changelog.py index c831d83..1ffd4d0 100644 --- a/oml/changelog.py +++ b/oml/changelog.py @@ -116,6 +116,8 @@ class Changelog(db.Model): if trigger: trigger_event('change', {}); return True + else: + logger.debug('could not apply change') else: logger.debug('revsion does not match! got %s expecting %s', revision, next_revision) return False diff --git a/oml/item/models.py b/oml/item/models.py index 3eef7d8..00af0fd 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -348,9 +348,10 @@ class Item(db.Model): load metadata from user_metadata or get via isbn? ''' for key in self.meta_keys: - if key not in self.meta and key in self.info: - self.meta[key] = self.info[key] - del self.info[key] + if key in self.info: + if key not in self.meta: + self.meta[key] = self.info[key] + del self.info[key] #FIXME get from user_meta if state.online: diff --git a/oml/nodes.py b/oml/nodes.py index 3f21c6f..5bec1ff 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -44,6 +44,7 @@ class Node(Thread): def __init__(self, nodes, user): self._nodes = nodes + self.user = user self.user_id = user.id self._opener = get_opener(self.user_id) self._q = Queue() @@ -200,32 +201,9 @@ class Node(Thread): state.update_required = True return None - ''' - sig = r.headers.get('X-Node-Signature') - if sig and self._valid(data, sig): - response = json.loads(data.decode('utf-8')) - else: - logger.debug('invalid signature %s', data) - response = None - ''' response = json.loads(data.decode('utf-8')) return response - def _valid(self, data, sig): - if isinstance(data, str): - data = data.encode() - try: - self.vk.verify(sig, data, encoding=ENCODING) - #except ed25519.BadSignatureError: - except: - return False - return True - - @property - def user(self): - with db.session(): - return user.models.User.get_or_create(self.user_id) - def can_connect(self): self.resolve() url = self.url @@ -244,10 +222,10 @@ class Node(Thread): logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) return False c = r.read() - logger.debug('can connect to: %s (%s)', url, self.user.nickname) + logger.debug('can connect to: %s', url) return True except: - logger.debug('can not connect to: %s (%s)', url, self.user.nickname) + logger.debug('can not connect to: %s', url) pass return False @@ -255,27 +233,28 @@ class Node(Thread): return self.online or self.get_local() != None def _go_online(self): - u = self.user - if u.peered or u.queued: - logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) - try: - self.online = self.can_connect() - if self.online: - logger.debug('connected to %s', self.url) - if u.queued: - logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered) - if u.pending == 'sent': - self.peering('requestPeering') - elif u.pending == '' and u.peered: - self.peering('acceptPeering') - else: - #fixme, what about cancel/reject peering here? - self.peering('removePeering') - except: - logger.debug('failed to connect to %s', self.user_id) + with db.session(): + u = user.models.User.get_or_create(self.user_id) + if u.peered or u.queued: + logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) + try: + self.online = self.can_connect() + if self.online: + logger.debug('connected to %s', self.url) + if u.queued: + logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered) + if u.pending == 'sent': + self.peering('requestPeering') + elif u.pending == '' and u.peered: + self.peering('acceptPeering') + else: + #fixme, what about cancel/reject peering here? + self.peering('removePeering') + except: + logger.debug('failed to connect to %s', self.user_id) + self.online = False + else: self.online = False - else: - self.online = False def trigger_status(self): if self.online is not None: @@ -285,21 +264,22 @@ class Node(Thread): }) def pullChanges(self): - if not self.online or not self.user.peered: - return True - last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first() - from_revision = last.revision + 1 if last else 0 - try: - changes = self.request('pullChanges', from_revision) - except: - self.online = False - logger.debug('%s went offline', self.user.name) - return False - if not changes: - return False with db.session(): - r = Changelog.apply_changes(self.user, changes) - return r + u = user.models.User.get_or_create(self.user_id) + if not self.online or not u.peered: + return True + last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first() + from_revision = last.revision + 1 if last else 0 + try: + changes = self.request('pullChanges', from_revision) + except: + self.online = False + logger.debug('%s went offline', u.name) + return False + if not changes: + return False + r = Changelog.apply_changes(u, changes) + return r def pushChanges(self, changes): logger.debug('pushing changes to %s %s', self.user_id, changes) @@ -312,24 +292,25 @@ class Node(Thread): logger.debug('pushedChanges %s %s', r, self.user_id) def peering(self, action): - u = self.user - if action in ('requestPeering', 'acceptPeering'): - r = self.request(action, settings.preferences['username'], u.info.get('message')) - else: - r = self.request(action, u.info.get('message')) - if r != None: - u.queued = False - if 'message' in u.info: - del u.info['message'] - u.save() - else: - logger.debug('peering failed? %s %s', action, r) - if action in ('cancelPeering', 'rejectPeering', 'removePeering'): - self.online = False - else: - self.go_online() - trigger_event('peering.%s'%action.replace('Peering', ''), u.json()) - return True + with db.session(): + u = user.models.User.get_or_create(self.user_id) + if action in ('requestPeering', 'acceptPeering'): + r = self.request(action, settings.preferences['username'], u.info.get('message')) + else: + r = self.request(action, u.info.get('message')) + if r != None: + u.queued = False + if 'message' in u.info: + del u.info['message'] + u.save() + else: + logger.debug('peering failed? %s %s', action, r) + if action in ('cancelPeering', 'rejectPeering', 'removePeering'): + self.online = False + else: + self.go_online() + trigger_event('peering.%s'%action.replace('Peering', ''), u.json()) + return True def download(self, item): from item.models import Transfer