use local nodes for requests
This commit is contained in:
parent
0c18dad1b5
commit
2f821bf6a7
6 changed files with 43 additions and 26 deletions
|
@ -211,8 +211,10 @@ class Changelog(db.Model):
|
||||||
user.info['users'] = {}
|
user.info['users'] = {}
|
||||||
user.info['users'][peerid] = username
|
user.info['users'][peerid] = username
|
||||||
user.save()
|
user.save()
|
||||||
User.get_or_create(peerid)
|
peer = User.get_or_create(peerid)
|
||||||
#fixme, add username to user?
|
if not 'username' in peer.info:
|
||||||
|
peer.info['username'] = username
|
||||||
|
peer.save()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def action_removepeer(self, user, timestamp, peerid):
|
def action_removepeer(self, user, timestamp, peerid):
|
||||||
|
|
|
@ -15,7 +15,7 @@ def parse(data):
|
||||||
for key in ('keys', 'group', 'list', 'range', 'sort', 'query'):
|
for key in ('keys', 'group', 'list', 'range', 'sort', 'query'):
|
||||||
if key in data:
|
if key in data:
|
||||||
query[key] = data[key]
|
query[key] = data[key]
|
||||||
print data
|
#print data
|
||||||
query['qs'] = oxflask.query.Parser(models.Item).find(data)
|
query['qs'] = oxflask.query.Parser(models.Item).find(data)
|
||||||
if 'query' in query and 'conditions' in query['query'] and query['query']['conditions']:
|
if 'query' in query and 'conditions' in query['query'] and query['query']['conditions']:
|
||||||
conditions = query['query']['conditions']
|
conditions = query['query']['conditions']
|
||||||
|
|
|
@ -46,7 +46,7 @@ class NodeHandler(tornado.web.RequestHandler):
|
||||||
content = {}
|
content = {}
|
||||||
if valid(key, data, sig):
|
if valid(key, data, sig):
|
||||||
action, args = json.loads(data)
|
action, args = json.loads(data)
|
||||||
print 'action', action, args
|
print key, 'action', action, args
|
||||||
if action == 'ping':
|
if action == 'ping':
|
||||||
content = {
|
content = {
|
||||||
'ip': request.remote_addr
|
'ip': request.remote_addr
|
||||||
|
|
39
oml/nodes.py
39
oml/nodes.py
|
@ -41,6 +41,8 @@ class Node(object):
|
||||||
if local:
|
if local:
|
||||||
url = 'http://[%s]:%s' % (local['host'], local['port'])
|
url = 'http://[%s]:%s' % (local['host'], local['port'])
|
||||||
print 'using local peer discovery to access node', url
|
print 'using local peer discovery to access node', url
|
||||||
|
elif not self.host:
|
||||||
|
return None
|
||||||
else:
|
else:
|
||||||
if ':' in self.host:
|
if ':' in self.host:
|
||||||
url = 'http://[%s]:%s' % (self.host, self.port)
|
url = 'http://[%s]:%s' % (self.host, self.port)
|
||||||
|
@ -48,7 +50,7 @@ class Node(object):
|
||||||
url = 'http://%s:%s' % (self.host, self.port)
|
url = 'http://%s:%s' % (self.host, self.port)
|
||||||
return url
|
return url
|
||||||
|
|
||||||
def resolve_host(self):
|
def resolve(self):
|
||||||
r = directory.get(self.vk)
|
r = directory.get(self.vk)
|
||||||
if r:
|
if r:
|
||||||
self.host = r['host']
|
self.host = r['host']
|
||||||
|
@ -64,9 +66,13 @@ class Node(object):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def request(self, action, *args):
|
def request(self, action, *args):
|
||||||
if not self.host:
|
url = self.url
|
||||||
self.resolve_host()
|
if not url:
|
||||||
if not self.host:
|
self.resolve()
|
||||||
|
url = self.url
|
||||||
|
if not self.url:
|
||||||
|
print 'unable to find host', self.user_id
|
||||||
|
self.online = False
|
||||||
return None
|
return None
|
||||||
content = json.dumps([action, args])
|
content = json.dumps([action, args])
|
||||||
sig = settings.sk.sign(content, encoding=ENCODING)
|
sig = settings.sk.sign(content, encoding=ENCODING)
|
||||||
|
@ -78,16 +84,18 @@ class Node(object):
|
||||||
'X-Ed25519-Key': settings.USER_ID,
|
'X-Ed25519-Key': settings.USER_ID,
|
||||||
'X-Ed25519-Signature': sig,
|
'X-Ed25519-Signature': sig,
|
||||||
}
|
}
|
||||||
r = requests.post(self.url, data=content, headers=headers)
|
r = requests.post(url, data=content, headers=headers)
|
||||||
if r.status_code == 403:
|
if r.status_code == 403:
|
||||||
print 'REMOTE ENDED PEERING'
|
print 'REMOTE ENDED PEERING'
|
||||||
if self.user.peered:
|
if self.user.peered:
|
||||||
self.user.update_peering(False)
|
self.user.update_peering(False)
|
||||||
|
self.online = False
|
||||||
data = r.content
|
data = r.content
|
||||||
sig = r.headers.get('X-Ed25519-Signature')
|
sig = r.headers.get('X-Ed25519-Signature')
|
||||||
if sig and self._valid(data, sig):
|
if sig and self._valid(data, sig):
|
||||||
response = json.loads(data)
|
response = json.loads(data)
|
||||||
else:
|
else:
|
||||||
|
print 'invalid signature', data
|
||||||
response = None
|
response = None
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
@ -104,7 +112,7 @@ class Node(object):
|
||||||
return user.models.User.get_or_create(self.user_id)
|
return user.models.User.get_or_create(self.user_id)
|
||||||
|
|
||||||
def go_online(self):
|
def go_online(self):
|
||||||
self.resolve_host()
|
self.resolve()
|
||||||
if self.user.peered:
|
if self.user.peered:
|
||||||
try:
|
try:
|
||||||
self.online = False
|
self.online = False
|
||||||
|
@ -166,25 +174,28 @@ class Node(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def rejectPeering(self, message):
|
def rejectPeering(self, message):
|
||||||
r = self.request('rejectPeering', message)
|
print 'reject peering!!!', self.user
|
||||||
p = self.user
|
p = self.user
|
||||||
p.update_peering(False)
|
p.update_peering(False)
|
||||||
self.go_online()
|
r = self.request('rejectPeering', message)
|
||||||
|
print 'reject peering!!!', r
|
||||||
|
self.online = False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def removePeering(self, message):
|
def removePeering(self, message):
|
||||||
print 'remove peering!!!', self.user
|
print 'remove peering!!!', self.user
|
||||||
r = self.request('removePeering', message)
|
|
||||||
p = self.user
|
p = self.user
|
||||||
|
if p.peered:
|
||||||
p.update_peering(False)
|
p.update_peering(False)
|
||||||
self.go_online()
|
r = self.request('removePeering', message)
|
||||||
|
self.online = False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def cancelPeering(self, message):
|
def cancelPeering(self, message):
|
||||||
r = self.request('cancelPeering', message)
|
|
||||||
p = self.user
|
p = self.user
|
||||||
p.update_peering(False)
|
p.update_peering(False)
|
||||||
self.go_online()
|
self.online = False
|
||||||
|
r = self.request('cancelPeering', message)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def download(self, item):
|
def download(self, item):
|
||||||
|
@ -248,7 +259,6 @@ class Nodes(Thread):
|
||||||
return id in self._nodes and self._nodes[id].download(item)
|
return id in self._nodes and self._nodes[id].download(item)
|
||||||
|
|
||||||
def _call(self, target, action, *args):
|
def _call(self, target, action, *args):
|
||||||
print 'call', target, action, args
|
|
||||||
if target == 'all':
|
if target == 'all':
|
||||||
nodes = self._nodes.values()
|
nodes = self._nodes.values()
|
||||||
elif target == 'online':
|
elif target == 'online':
|
||||||
|
@ -262,12 +272,14 @@ class Nodes(Thread):
|
||||||
if user_id not in self._nodes:
|
if user_id not in self._nodes:
|
||||||
from user.models import User
|
from user.models import User
|
||||||
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
|
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
|
||||||
|
'''
|
||||||
else:
|
else:
|
||||||
self._nodes[user_id].online = True
|
self._nodes[user_id].online = True
|
||||||
trigger_event('status', {
|
trigger_event('status', {
|
||||||
'id': user_id,
|
'id': user_id,
|
||||||
'status': 'online'
|
'status': 'online'
|
||||||
})
|
})
|
||||||
|
'''
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
with self._app.app_context():
|
with self._app.app_context():
|
||||||
|
@ -277,7 +289,6 @@ class Nodes(Thread):
|
||||||
if args[0] == 'add':
|
if args[0] == 'add':
|
||||||
self._add_node(args[1])
|
self._add_node(args[1])
|
||||||
else:
|
else:
|
||||||
print 'next', args
|
|
||||||
self._call(*args)
|
self._call(*args)
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
|
|
|
@ -205,9 +205,9 @@ actions.register(rejectPeering, cache=False)
|
||||||
@returns_json
|
@returns_json
|
||||||
def removePeering(request):
|
def removePeering(request):
|
||||||
data = json.loads(request.form['data']) if 'data' in request.form else {}
|
data = json.loads(request.form['data']) if 'data' in request.form else {}
|
||||||
p = models.User.get_or_create(data['id'])
|
u = models.User.get_or_create(data['id'])
|
||||||
state.nodes.queue('add', p.id)
|
state.nodes.queue('add', u.id)
|
||||||
state.nodes.queue(p.id, 'removePeering', data.get('message', ''))
|
state.nodes.queue(u.id, 'removePeering', data.get('message', ''))
|
||||||
return {}
|
return {}
|
||||||
actions.register(removePeering, cache=False)
|
actions.register(removePeering, cache=False)
|
||||||
|
|
||||||
|
|
|
@ -65,6 +65,7 @@ class User(db.Model):
|
||||||
return [l.json() for l in self.lists.order_by('position')]
|
return [l.json() for l in self.lists.order_by('position')]
|
||||||
|
|
||||||
def update_peering(self, peered, username=None):
|
def update_peering(self, peered, username=None):
|
||||||
|
was_peering = self.peered
|
||||||
if peered:
|
if peered:
|
||||||
self.pending = ''
|
self.pending = ''
|
||||||
self.peered = True
|
self.peered = True
|
||||||
|
@ -72,19 +73,22 @@ class User(db.Model):
|
||||||
self.info['username'] = username
|
self.info['username'] = username
|
||||||
|
|
||||||
self.set_nickname(self.info.get('username', 'anonymous'))
|
self.set_nickname(self.info.get('username', 'anonymous'))
|
||||||
|
if not was_peering:
|
||||||
Changelog.record(state.user(), 'addpeer', self.id, self.nickname)
|
Changelog.record(state.user(), 'addpeer', self.id, self.nickname)
|
||||||
else:
|
else:
|
||||||
|
self.pending = ''
|
||||||
self.peered = False
|
self.peered = False
|
||||||
self.nickname = None
|
self.nickname = None
|
||||||
self.save()
|
List.query.filter_by(user_id=self.id).delete()
|
||||||
for i in self.items:
|
for i in self.items:
|
||||||
i.users.remove(self)
|
i.users.remove(self)
|
||||||
if not i.users:
|
if not i.users:
|
||||||
print 'last user, remove'
|
|
||||||
db.session.delete(i)
|
db.session.delete(i)
|
||||||
else:
|
else:
|
||||||
i.update_lists()
|
i.update_lists()
|
||||||
Changelog.query.filter_by(user_id=self.id).delete()
|
Changelog.query.filter_by(user_id=self.id).delete()
|
||||||
|
self.save()
|
||||||
|
if was_peering:
|
||||||
Changelog.record(state.user(), 'removepeer', self.id)
|
Changelog.record(state.user(), 'removepeer', self.id)
|
||||||
self.save()
|
self.save()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue