avoid stale db sessoins in nodes thread

This commit is contained in:
j 2016-01-12 13:02:39 +05:30
parent cf74e622be
commit cf6d6acc5f
3 changed files with 64 additions and 80 deletions

View file

@ -116,6 +116,8 @@ class Changelog(db.Model):
if trigger: if trigger:
trigger_event('change', {}); trigger_event('change', {});
return True return True
else:
logger.debug('could not apply change')
else: else:
logger.debug('revsion does not match! got %s expecting %s', revision, next_revision) logger.debug('revsion does not match! got %s expecting %s', revision, next_revision)
return False return False

View file

@ -348,9 +348,10 @@ class Item(db.Model):
load metadata from user_metadata or get via isbn? load metadata from user_metadata or get via isbn?
''' '''
for key in self.meta_keys: for key in self.meta_keys:
if key not in self.meta and key in self.info: if key in self.info:
self.meta[key] = self.info[key] if key not in self.meta:
del self.info[key] self.meta[key] = self.info[key]
del self.info[key]
#FIXME get from user_meta #FIXME get from user_meta
if state.online: if state.online:

View file

@ -44,6 +44,7 @@ class Node(Thread):
def __init__(self, nodes, user): def __init__(self, nodes, user):
self._nodes = nodes self._nodes = nodes
self.user = user
self.user_id = user.id self.user_id = user.id
self._opener = get_opener(self.user_id) self._opener = get_opener(self.user_id)
self._q = Queue() self._q = Queue()
@ -200,32 +201,9 @@ class Node(Thread):
state.update_required = True state.update_required = True
return None return None
'''
sig = r.headers.get('X-Node-Signature')
if sig and self._valid(data, sig):
response = json.loads(data.decode('utf-8'))
else:
logger.debug('invalid signature %s', data)
response = None
'''
response = json.loads(data.decode('utf-8')) response = json.loads(data.decode('utf-8'))
return response return response
def _valid(self, data, sig):
if isinstance(data, str):
data = data.encode()
try:
self.vk.verify(sig, data, encoding=ENCODING)
#except ed25519.BadSignatureError:
except:
return False
return True
@property
def user(self):
with db.session():
return user.models.User.get_or_create(self.user_id)
def can_connect(self): def can_connect(self):
self.resolve() self.resolve()
url = self.url url = self.url
@ -244,10 +222,10 @@ class Node(Thread):
logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version)
return False return False
c = r.read() c = r.read()
logger.debug('can connect to: %s (%s)', url, self.user.nickname) logger.debug('can connect to: %s', url)
return True return True
except: except:
logger.debug('can not connect to: %s (%s)', url, self.user.nickname) logger.debug('can not connect to: %s', url)
pass pass
return False return False
@ -255,27 +233,28 @@ class Node(Thread):
return self.online or self.get_local() != None return self.online or self.get_local() != None
def _go_online(self): def _go_online(self):
u = self.user with db.session():
if u.peered or u.queued: u = user.models.User.get_or_create(self.user_id)
logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) if u.peered or u.queued:
try: logger.debug('go_online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
self.online = self.can_connect() try:
if self.online: self.online = self.can_connect()
logger.debug('connected to %s', self.url) if self.online:
if u.queued: logger.debug('connected to %s', self.url)
logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered) if u.queued:
if u.pending == 'sent': logger.debug('queued peering event pending=%s peered=%s', u.pending, u.peered)
self.peering('requestPeering') if u.pending == 'sent':
elif u.pending == '' and u.peered: self.peering('requestPeering')
self.peering('acceptPeering') elif u.pending == '' and u.peered:
else: self.peering('acceptPeering')
#fixme, what about cancel/reject peering here? else:
self.peering('removePeering') #fixme, what about cancel/reject peering here?
except: self.peering('removePeering')
logger.debug('failed to connect to %s', self.user_id) except:
logger.debug('failed to connect to %s', self.user_id)
self.online = False
else:
self.online = False self.online = False
else:
self.online = False
def trigger_status(self): def trigger_status(self):
if self.online is not None: if self.online is not None:
@ -285,21 +264,22 @@ class Node(Thread):
}) })
def pullChanges(self): def pullChanges(self):
if not self.online or not self.user.peered:
return True
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
from_revision = last.revision + 1 if last else 0
try:
changes = self.request('pullChanges', from_revision)
except:
self.online = False
logger.debug('%s went offline', self.user.name)
return False
if not changes:
return False
with db.session(): with db.session():
r = Changelog.apply_changes(self.user, changes) u = user.models.User.get_or_create(self.user_id)
return r if not self.online or not u.peered:
return True
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
from_revision = last.revision + 1 if last else 0
try:
changes = self.request('pullChanges', from_revision)
except:
self.online = False
logger.debug('%s went offline', u.name)
return False
if not changes:
return False
r = Changelog.apply_changes(u, changes)
return r
def pushChanges(self, changes): def pushChanges(self, changes):
logger.debug('pushing changes to %s %s', self.user_id, changes) logger.debug('pushing changes to %s %s', self.user_id, changes)
@ -312,24 +292,25 @@ class Node(Thread):
logger.debug('pushedChanges %s %s', r, self.user_id) logger.debug('pushedChanges %s %s', r, self.user_id)
def peering(self, action): def peering(self, action):
u = self.user with db.session():
if action in ('requestPeering', 'acceptPeering'): u = user.models.User.get_or_create(self.user_id)
r = self.request(action, settings.preferences['username'], u.info.get('message')) if action in ('requestPeering', 'acceptPeering'):
else: r = self.request(action, settings.preferences['username'], u.info.get('message'))
r = self.request(action, u.info.get('message')) else:
if r != None: r = self.request(action, u.info.get('message'))
u.queued = False if r != None:
if 'message' in u.info: u.queued = False
del u.info['message'] if 'message' in u.info:
u.save() del u.info['message']
else: u.save()
logger.debug('peering failed? %s %s', action, r) else:
if action in ('cancelPeering', 'rejectPeering', 'removePeering'): logger.debug('peering failed? %s %s', action, r)
self.online = False if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
else: self.online = False
self.go_online() else:
trigger_event('peering.%s'%action.replace('Peering', ''), u.json()) self.go_online()
return True trigger_event('peering.%s'%action.replace('Peering', ''), u.json())
return True
def download(self, item): def download(self, item):
from item.models import Transfer from item.models import Transfer