Compare commits

...

14 Commits

Author SHA1 Message Date
j 103aab3f4a keep listorder in sync 2019-02-02 18:04:06 +05:30
j 5f36b2eab4 get rid of Public lists 2019-02-02 17:53:45 +05:30
j 9d11bbba53 don't record addlistitems for Inbox 2019-02-02 17:36:28 +05:30
j 431dc8e194 ping on action 2019-02-02 17:36:11 +05:30
j 4033958341 remove unique contraint for user.nickname 2019-02-02 15:53:53 +05:30
j bcfc8ba9b2 typo(again) 2019-02-02 14:58:10 +05:30
j d84e950ca8 typo 2019-02-02 14:57:04 +05:30
j de9a172f39 start callbacks on main thread 2019-02-02 14:42:26 +05:30
j c7e316ab22 select for update 2019-02-02 14:29:12 +05:30
j a111aaac46 unlink log on remove 2019-02-02 14:29:05 +05:30
j 611fc2b373 ping nodes to update online status 2019-02-02 14:21:25 +05:30
j 265a4791fe log updated 2019-02-02 14:04:03 +05:30
j ad15b8a25a watch websocket errors 2019-02-02 12:57:46 +05:30
j c95e22869c queue peering requests per node 2019-02-02 12:43:37 +05:30
11 changed files with 168 additions and 74 deletions

View File

@ -538,7 +538,7 @@ class Item(db.Model):
add_record('additem', self.id, f.info) add_record('additem', self.id, f.info)
add_record('edititem', self.id, self.meta) add_record('edititem', self.id, self.meta)
for l in self.lists.filter_by(user_id=settings.USER_ID): for l in self.lists.filter_by(user_id=settings.USER_ID):
if l.name != '': if l.name != '' and l.name != 'Inbox':
add_record('addlistitems', l.name, [self.id]) add_record('addlistitems', l.name, [self.id])
self.update() self.update()
f.move() f.move()

View File

@ -130,16 +130,22 @@ class Peer(object):
self.info['lists'][new['name']] = self.info['lists'].pop(name) self.info['lists'][new['name']] = self.info['lists'].pop(name)
else: else:
self.info['lists'][new['name']] = [] self.info['lists'][new['name']] = []
if name in self.info['listorder']:
self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']]
elif action == 'orderlists': elif action == 'orderlists':
self.info['listorder'] = args[0] self.info['listorder'] = args[0]
elif action == 'removelist': elif action == 'removelist':
name = args[0] name = args[0]
if name in self.info['lists']: if name in self.info['lists']:
del self.info['lists'][name] del self.info['lists'][name]
if name in self.info['listorder']:
self.info['listorder'] = [n for n in self.info['listorder'] if n != name]
elif action == 'addlistitems': elif action == 'addlistitems':
name, ids = args name, ids = args
if name not in self.info['lists']: if name not in self.info['lists']:
self.info['lists'][name] = [] self.info['lists'][name] = []
if name not in self.info['listorder']:
self.info['listorder'].append(name)
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids)) self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
elif action == 'removelistitems': elif action == 'removelistitems':
name, ids = args name, ids = args
@ -208,8 +214,9 @@ class Peer(object):
def remove(self): def remove(self):
self.join() self.join()
os.unlink(self._dbpath) for path in (self._dbpath, self._logpath, self._infopath):
os.unlink(self._infopath) if os.path.exists(path):
os.unlink(path)
def sync_db(self): def sync_db(self):
import item.models import item.models
@ -327,6 +334,8 @@ class Peer(object):
if state.shutdown: if state.shutdown:
break break
state.db.session.commit() state.db.session.commit()
if update_items:
logger.debug('updated %s items', len(update_items))
ids = set(self.library.keys()) ids = set(self.library.keys())
changed = False changed = False
for name, l in self.info.get('lists', {}).items(): for name, l in self.info.get('lists', {}).items():

View File

@ -153,5 +153,7 @@ class LocalNodes(dict):
state.tasks.queue('removelocalinfo', id) state.tasks.queue('removelocalinfo', id)
def get(self, user_id): def get(self, user_id):
if user_id in self and can_connect(self[user_id]): data = super().get(user_id)
return self[user_id] if user_id in self else None if data and can_connect(data):
return data
return None

View File

@ -1,16 +1,17 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from io import BytesIO
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
import json
from io import BytesIO
import gzip import gzip
import urllib.request, urllib.error, urllib.parse import json
import os import os
import time
import socket import socket
import socks import socks
import time
import urllib.error
import urllib.parse
import urllib.request
import ox import ox
from tornado.ioloop import PeriodicCallback from tornado.ioloop import PeriodicCallback
@ -37,12 +38,13 @@ class Node(Thread):
_online = None _online = None
TIMEOUT = 5 TIMEOUT = 5
def __init__(self, nodes, user): def __init__(self, nodes, user_id):
self._nodes = nodes self._nodes = nodes
self.user = user self.user_id = user_id
self.user_id = user.id
self._opener = get_opener(self.user_id) self._opener = get_opener(self.user_id)
self._q = Queue() self._q = Queue()
self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval'])
state.main.add_callback(self._pingcb.start)
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
@ -53,10 +55,22 @@ class Node(Thread):
action = self._q.get() action = self._q.get()
if state.shutdown: if state.shutdown:
break break
if action == 'send_response': if action == 'ping':
self._send_response()
elif action == 'ping':
self.online = self.can_connect() self.online = self.can_connect()
elif action == 'send_response':
if self.online:
self._send_response()
else:
if not self._q.qsize():
time.sleep(5)
self.send_response()
elif isinstance(action, list) and len(action) == 2:
if self.online:
self._call(action[0], *action[1])
else:
if not self._q.qsize():
time.sleep(5)
self.queue(action[0], *action[1])
else: else:
logger.debug('unknown action %s', action) logger.debug('unknown action %s', action)
@ -68,6 +82,14 @@ class Node(Thread):
if state.online or self.get_local(): if state.online or self.get_local():
self._q.put('ping') self._q.put('ping')
def queue(self, action, *args):
logger.debug('queue node action %s->%s%s', self.user_id, action, args)
self._q.put([action, args])
def _call(self, action, *args):
r = getattr(self, action)(*args)
logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r)
@property @property
def url(self): def url(self):
if self.local: if self.local:
@ -202,34 +224,26 @@ class Node(Thread):
def _send_response(self): def _send_response(self):
with db.session(): with db.session():
u = user.models.User.get(self.user_id) u = user.models.User.get(self.user_id)
send_response = u and u.peered or u.queued
if u: if u:
user_pending = u.pending user_pending = u.pending
user_peered = u.peered user_peered = u.peered
user_queued = u.queued user_queued = u.queued
else:
user_queued = False
if DEBUG_NODES: if DEBUG_NODES:
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
if send_response: if user_queued:
try: if DEBUG_NODES:
self.online = self.can_connect() logger.debug('connected to %s', self.url)
except: logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
if DEBUG_NODES: if user_pending == 'sent':
logger.debug('failed to connect to %s', self.user_id) self.peering('requestPeering')
self.online = False elif user_pending == '' and user_peered:
if self.online: self.peering('acceptPeering')
if DEBUG_NODES: else:
logger.debug('connected to %s', self.url) #fixme, what about cancel/reject peering here?
if user_queued: self.peering('removePeering')
if DEBUG_NODES:
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
if user_pending == 'sent':
self.peering('requestPeering')
elif user_pending == '' and user_peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
def trigger_status(self): def trigger_status(self):
if self.online is not None: if self.online is not None:
@ -321,6 +335,7 @@ class Node(Thread):
return False return False
def peering(self, action): def peering(self, action):
pull_changes = False
with db.session(): with db.session():
u = user.models.User.get_or_create(self.user_id) u = user.models.User.get_or_create(self.user_id)
user_info = u.info user_info = u.info
@ -335,6 +350,8 @@ class Node(Thread):
if 'message' in u.info: if 'message' in u.info:
del u.info['message'] del u.info['message']
u.save() u.save()
if action == 'acceptPeering':
pull_changes = True
else: else:
logger.debug('peering failed? %s %s', action, r) logger.debug('peering failed? %s %s', action, r)
if action in ('cancelPeering', 'rejectPeering', 'removePeering'): if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
@ -342,6 +359,8 @@ class Node(Thread):
with db.session(): with db.session():
u = user.models.User.get(self.user_id) u = user.models.User.get(self.user_id)
trigger_event('peering.%s' % action.replace('Peering', ''), u.json()) trigger_event('peering.%s' % action.replace('Peering', ''), u.json())
if pull_changes:
self.pullChanges()
return True return True
headers = { headers = {
@ -493,7 +512,7 @@ class Nodes(Thread):
self.queue('add', u.id, True) self.queue('add', u.id, True)
self.local = LocalNodes() self.local = LocalNodes()
self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval']) self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
self._pullcb.start() state.main.add_callback(self._pullcb.start)
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
@ -514,9 +533,16 @@ class Nodes(Thread):
def queue(self, *args): def queue(self, *args):
if args: if args:
logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize()) logger.debug('queue "%s", %s entries in queue', args, self._q.qsize())
self._q.put(list(args)) self._q.put(list(args))
def peer_queue(self, peer, action, *args):
if peer not in self._nodes:
self._add(peer)
elif not self._nodes[peer].is_online():
self._nodes[peer].ping()
self._nodes[peer].queue(action, *args)
def is_online(self, id): def is_online(self, id):
return id in self._nodes and self._nodes[id].is_online() return id in self._nodes and self._nodes[id].is_online()
@ -532,22 +558,27 @@ class Nodes(Thread):
if target == 'all': if target == 'all':
nodes = list(self._nodes.values()) nodes = list(self._nodes.values())
elif target == 'peered': elif target == 'peered':
nodes = [n for n in list(self._nodes.values()) if n.user.peered] ids = []
with db.session():
from user.models import User
for u in User.query.filter(User.id != settings.USER_ID).filter_by(peered=True).all():
ids.append(u.id)
nodes = [n for n in list(self._nodes.values()) if n.user_id in ids]
elif target == 'online': elif target == 'online':
nodes = [n for n in list(self._nodes.values()) if n.online] nodes = [n for n in list(self._nodes.values()) if n.online]
else: else:
if not target in self._nodes: if target not in self._nodes:
self._add(target) self._add(target)
nodes = [self._nodes[target]] nodes = [self._nodes[target]]
for node in nodes: for node in nodes:
r = getattr(node, action)(*args) node._call(action, *args)
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
def _add(self, user_id, send_response=False): def _add(self, user_id, send_response=False):
if user_id not in self._nodes: if user_id not in self._nodes:
from user.models import User from user.models import User
with db.session(): with db.session():
self._nodes[user_id] = Node(self, User.get_or_create(user_id)) User.get_or_create(user_id)
self._nodes[user_id] = Node(self, user_id)
else: else:
self._nodes[user_id].ping() self._nodes[user_id].ping()
if send_response: if send_response:
@ -575,7 +606,7 @@ class Nodes(Thread):
if state.shutdown: if state.shutdown:
break break
node = self._nodes.get(u['id']) node = self._nodes.get(u['id'])
if node: if node and node.is_online():
node.pullChanges() node.pullChanges()
self._pulling = False self._pulling = False
@ -589,8 +620,6 @@ class Nodes(Thread):
def publish_node(): def publish_node():
update_online() update_online()
state.check_nodes = PeriodicCallback(check_nodes, 120000)
state.check_nodes.start()
state._online = PeriodicCallback(update_online, 60000) state._online = PeriodicCallback(update_online, 60000)
state._online.start() state._online.start()
@ -605,11 +634,3 @@ def update_online():
if state.online: if state.online:
for node in list(state.nodes._nodes.values()): for node in list(state.nodes._nodes.values()):
node.trigger_status() node.trigger_status()
def check_nodes():
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id, True)

View File

@ -128,7 +128,11 @@ class Parser(object):
elif k == 'list': elif k == 'list':
nickname, name = v.split(':', 1) nickname, name = v.split(':', 1)
if nickname: if nickname:
u = self._user.query.filter_by(nickname=nickname).one() try:
u = self._user.query.filter_by(nickname=nickname, peered=True).one()
except sqlalchemy.orm.exc.NoResultFound:
ids = []
return self.in_ids(ids, exclude)
else: else:
u = self._user.query.filter_by(id=settings.USER_ID).one() u = self._user.query.filter_by(id=settings.USER_ID).one()
if not name: if not name:

View File

@ -95,4 +95,4 @@ FULLTEXT_SUPPORT = fulltext.platform_supported()
if not FULLTEXT_SUPPORT: if not FULLTEXT_SUPPORT:
config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext'] config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext']
DB_VERSION = 15 DB_VERSION = 17

View File

@ -43,11 +43,11 @@ CREATE TABLE user (
peered BOOLEAN, peered BOOLEAN,
online BOOLEAN, online BOOLEAN,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE (nickname),
CHECK (queued IN (0, 1)), CHECK (queued IN (0, 1)),
CHECK (peered IN (0, 1)), CHECK (peered IN (0, 1)),
CHECK (online IN (0, 1)) CHECK (online IN (0, 1))
); );
CREATE INDEX ix_user_nichname ON user (nichname);
CREATE TABLE metadata ( CREATE TABLE metadata (
created DATETIME, created DATETIME,
modified DATETIME, modified DATETIME,

View File

@ -373,6 +373,10 @@ class Update(Thread):
db_version = migrate_13() db_version = migrate_13()
if db_version < 15: if db_version < 15:
db_version = migrate_15() db_version = migrate_15()
if db_version < 16:
db_version = migrate_16()
if db_version < 17:
db_version = migrate_17()
settings.server['db_version'] = db_version settings.server['db_version'] = db_version
def run(self): def run(self):
@ -437,7 +441,7 @@ def migrate_5():
'DROP INDEX IF EXISTS user_metadata_index', 'DROP INDEX IF EXISTS user_metadata_index',
'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)', 'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)',
'UPDATE sort SET sharemetadata = 0', 'UPDATE sort SET sharemetadata = 0',
]), ])
with db.session() as session: with db.session() as session:
import user.models import user.models
for m in user.models.Metadata.query: for m in user.models.Metadata.query:
@ -631,3 +635,42 @@ def migrate_15():
del u.info['local'] del u.info['local']
u.save() u.save()
return 15 return 15
def migrate_16():
db.run_sql([
'''CREATE TABLE user2 (
created DATETIME,
modified DATETIME,
id VARCHAR(43) NOT NULL,
info BLOB,
nickname VARCHAR(256),
pending VARCHAR(64),
queued BOOLEAN,
peered BOOLEAN,
online BOOLEAN,
PRIMARY KEY (id),
CHECK (queued IN (0, 1)),
CHECK (peered IN (0, 1)),
CHECK (online IN (0, 1))
)''',
'''INSERT INTO user2 (created, modified, id, info, nickname, pending, queued, peered, online)
SELECT created, modified, id, info, nickname, pending, queued, peered, online FROM user''',
'DROP TABLE user',
'ALTER TABLE user2 RENAME TO user',
'CREATE INDEX IF NOT EXISTS ix_user_nickname ON user (nickname)'
])
return 16
def migrate_17():
from user.models import List, User
from changelog import add_record
with db.session():
l = List.get(':Public')
if not l:
add_record('removelist', 'Public')
lists = []
for l in List.query.filter_by(user_id=settings.USER_ID).order_by('index_'):
if l.type == 'static' and l.name not in ('', 'Inbox'):
lists.append(l.name)
add_record('orderlists', lists)
return 17

View File

@ -422,7 +422,7 @@ def requestPeering(data):
u.info['nickname'] = data.get('nickname', '') u.info['nickname'] = data.get('nickname', '')
u.update_name() u.update_name()
u.save() u.save()
state.nodes.queue(u.id, 'peering', 'requestPeering') state.nodes.peer_queue(u.id, 'peering', 'requestPeering')
return {} return {}
actions.register(requestPeering, cache=False) actions.register(requestPeering, cache=False)
@ -441,7 +441,7 @@ def acceptPeering(data):
u = models.User.get_or_create(data['id']) u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '') u.info['message'] = data.get('message', '')
u.update_peering(True) u.update_peering(True)
state.nodes.queue(u.id, 'peering', 'acceptPeering') state.nodes.peer_queue(u.id, 'peering', 'acceptPeering')
return {} return {}
actions.register(acceptPeering, cache=False) actions.register(acceptPeering, cache=False)
@ -459,7 +459,7 @@ def rejectPeering(data):
u = models.User.get_or_create(data['id']) u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '') u.info['message'] = data.get('message', '')
u.update_peering(False) u.update_peering(False)
state.nodes.queue(u.id, 'peering', 'rejectPeering') state.nodes.peer_queue(u.id, 'peering', 'rejectPeering')
return {} return {}
actions.register(rejectPeering, cache=False) actions.register(rejectPeering, cache=False)
@ -474,10 +474,11 @@ def removePeering(data):
if len(data.get('id', '')) not in (16, 43): if len(data.get('id', '')) not in (16, 43):
logger.debug('invalid user id') logger.debug('invalid user id')
return {} return {}
u = models.User.get_or_create(data['id']) u = models.User.get(data['id'], for_udpate=True)
u.info['message'] = data.get('message', '') if u:
u.update_peering(False) u.info['message'] = data.get('message', '')
state.nodes.queue(u.id, 'peering', 'removePeering') u.update_peering(False)
state.nodes.peer_queue(u.id, 'peering', 'removePeering')
return {} return {}
actions.register(removePeering, cache=False) actions.register(removePeering, cache=False)
@ -493,7 +494,7 @@ def cancelPeering(data):
u = models.User.get_or_create(data['id']) u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '') u.info['message'] = data.get('message', '')
u.update_peering(False) u.update_peering(False)
state.nodes.queue(u.id, 'peering', 'cancelPeering') state.nodes.peer_queue(u.id, 'peering', 'cancelPeering')
return {} return {}
actions.register(cancelPeering, cache=False) actions.register(cancelPeering, cache=False)

View File

@ -30,7 +30,7 @@ class User(db.Model):
id = sa.Column(sa.String(43), primary_key=True) id = sa.Column(sa.String(43), primary_key=True)
info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler)))
nickname = sa.Column(sa.String(256), unique=True) nickname = sa.Column(sa.String(256), index=True)
pending = sa.Column(sa.String(64)) # sent|received pending = sa.Column(sa.String(64)) # sent|received
queued = sa.Column(sa.Boolean()) queued = sa.Column(sa.Boolean())
@ -339,7 +339,7 @@ class List(db.Model):
state.db.session.add(self) state.db.session.add(self)
if commit: if commit:
state.db.session.commit() state.db.session.commit()
if self.user_id == settings.USER_ID and self.name != '' and available_items: if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items:
add_record('addlistitems', self.name, available_items) add_record('addlistitems', self.name, available_items)
def get_items(self): def get_items(self):
@ -368,7 +368,7 @@ class List(db.Model):
q = list_items.delete().where(list_items.columns['list_id'].is_(self.id)) q = list_items.delete().where(list_items.columns['list_id'].is_(self.id))
state.db.session.execute(q) state.db.session.execute(q)
if not self._query: if not self._query:
if self.user_id == settings.USER_ID and self.name != '': if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'):
add_record('removelist', self.name) add_record('removelist', self.name)
state.db.session.delete(self) state.db.session.delete(self)
if commit: if commit:

View File

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
import json import json
from oxtornado import json_dumps from oxtornado import json_dumps
@ -49,7 +49,11 @@ class Handler(WebSocketHandler):
#websocket calls #websocket calls
def on_message(self, message): def on_message(self, message):
action, data = json.loads(message) try:
action, data = json.loads(message)
except json.decoder.JSONDecodeError:
logger.debug('invalid websocket message: %s', message)
return
if state.tasks: if state.tasks:
state.tasks.queue(action, data) state.tasks.queue(action, data)
@ -62,7 +66,17 @@ class Handler(WebSocketHandler):
if self.ws_connection is None: if self.ws_connection is None:
self.on_close() self.on_close()
else: else:
state.main.add_callback(lambda: self.write_message(message)) state.main.add_callback(lambda: self._write_message(message))
async def _write_message(self, message):
try:
task = self.write_message(message)
await task
except StreamClosedError as e:
self.on_close()
except WebSocketClosedError as e:
self.on_close()
def trigger_event(event, data): def trigger_event(event, data):
#if len(state.websockets): #if len(state.websockets):