Compare commits

...

14 Commits

Author SHA1 Message Date
j 103aab3f4a keep listorder in sync 2019-02-02 18:04:06 +05:30
j 5f36b2eab4 get rid of Public lists 2019-02-02 17:53:45 +05:30
j 9d11bbba53 don't record addlistitems for Inbox 2019-02-02 17:36:28 +05:30
j 431dc8e194 ping on action 2019-02-02 17:36:11 +05:30
j 4033958341 remove unique contraint for user.nickname 2019-02-02 15:53:53 +05:30
j bcfc8ba9b2 typo(again) 2019-02-02 14:58:10 +05:30
j d84e950ca8 typo 2019-02-02 14:57:04 +05:30
j de9a172f39 start callbacks on main thread 2019-02-02 14:42:26 +05:30
j c7e316ab22 select for update 2019-02-02 14:29:12 +05:30
j a111aaac46 unlink log on remove 2019-02-02 14:29:05 +05:30
j 611fc2b373 ping nodes to update online status 2019-02-02 14:21:25 +05:30
j 265a4791fe log updated 2019-02-02 14:04:03 +05:30
j ad15b8a25a watch websocket errors 2019-02-02 12:57:46 +05:30
j c95e22869c queue peering requests per node 2019-02-02 12:43:37 +05:30
11 changed files with 168 additions and 74 deletions

View File

@ -538,7 +538,7 @@ class Item(db.Model):
add_record('additem', self.id, f.info)
add_record('edititem', self.id, self.meta)
for l in self.lists.filter_by(user_id=settings.USER_ID):
if l.name != '':
if l.name != '' and l.name != 'Inbox':
add_record('addlistitems', l.name, [self.id])
self.update()
f.move()

View File

@ -130,16 +130,22 @@ class Peer(object):
self.info['lists'][new['name']] = self.info['lists'].pop(name)
else:
self.info['lists'][new['name']] = []
if name in self.info['listorder']:
self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']]
elif action == 'orderlists':
self.info['listorder'] = args[0]
elif action == 'removelist':
name = args[0]
if name in self.info['lists']:
del self.info['lists'][name]
if name in self.info['listorder']:
self.info['listorder'] = [n for n in self.info['listorder'] if n != name]
elif action == 'addlistitems':
name, ids = args
if name not in self.info['lists']:
self.info['lists'][name] = []
if name not in self.info['listorder']:
self.info['listorder'].append(name)
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
elif action == 'removelistitems':
name, ids = args
@ -208,8 +214,9 @@ class Peer(object):
def remove(self):
self.join()
os.unlink(self._dbpath)
os.unlink(self._infopath)
for path in (self._dbpath, self._logpath, self._infopath):
if os.path.exists(path):
os.unlink(path)
def sync_db(self):
import item.models
@ -327,6 +334,8 @@ class Peer(object):
if state.shutdown:
break
state.db.session.commit()
if update_items:
logger.debug('updated %s items', len(update_items))
ids = set(self.library.keys())
changed = False
for name, l in self.info.get('lists', {}).items():

View File

@ -153,5 +153,7 @@ class LocalNodes(dict):
state.tasks.queue('removelocalinfo', id)
def get(self, user_id):
if user_id in self and can_connect(self[user_id]):
return self[user_id] if user_id in self else None
data = super().get(user_id)
if data and can_connect(data):
return data
return None

View File

@ -1,16 +1,17 @@
# -*- coding: utf-8 -*-
from io import BytesIO
from queue import Queue
from threading import Thread
import json
from io import BytesIO
import gzip
import urllib.request, urllib.error, urllib.parse
import json
import os
import time
import socket
import socks
import time
import urllib.error
import urllib.parse
import urllib.request
import ox
from tornado.ioloop import PeriodicCallback
@ -37,12 +38,13 @@ class Node(Thread):
_online = None
TIMEOUT = 5
def __init__(self, nodes, user):
def __init__(self, nodes, user_id):
self._nodes = nodes
self.user = user
self.user_id = user.id
self.user_id = user_id
self._opener = get_opener(self.user_id)
self._q = Queue()
self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval'])
state.main.add_callback(self._pingcb.start)
Thread.__init__(self)
self.daemon = True
self.start()
@ -53,10 +55,22 @@ class Node(Thread):
action = self._q.get()
if state.shutdown:
break
if action == 'send_response':
self._send_response()
elif action == 'ping':
if action == 'ping':
self.online = self.can_connect()
elif action == 'send_response':
if self.online:
self._send_response()
else:
if not self._q.qsize():
time.sleep(5)
self.send_response()
elif isinstance(action, list) and len(action) == 2:
if self.online:
self._call(action[0], *action[1])
else:
if not self._q.qsize():
time.sleep(5)
self.queue(action[0], *action[1])
else:
logger.debug('unknown action %s', action)
@ -68,6 +82,14 @@ class Node(Thread):
if state.online or self.get_local():
self._q.put('ping')
def queue(self, action, *args):
logger.debug('queue node action %s->%s%s', self.user_id, action, args)
self._q.put([action, args])
def _call(self, action, *args):
r = getattr(self, action)(*args)
logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r)
@property
def url(self):
if self.local:
@ -202,34 +224,26 @@ class Node(Thread):
def _send_response(self):
with db.session():
u = user.models.User.get(self.user_id)
send_response = u and u.peered or u.queued
if u:
user_pending = u.pending
user_peered = u.peered
user_queued = u.queued
else:
user_queued = False
if DEBUG_NODES:
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
if send_response:
try:
self.online = self.can_connect()
except:
if DEBUG_NODES:
logger.debug('failed to connect to %s', self.user_id)
self.online = False
if self.online:
if DEBUG_NODES:
logger.debug('connected to %s', self.url)
if user_queued:
if DEBUG_NODES:
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
if user_pending == 'sent':
self.peering('requestPeering')
elif user_pending == '' and user_peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
if user_queued:
if DEBUG_NODES:
logger.debug('connected to %s', self.url)
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
if user_pending == 'sent':
self.peering('requestPeering')
elif user_pending == '' and user_peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
def trigger_status(self):
if self.online is not None:
@ -321,6 +335,7 @@ class Node(Thread):
return False
def peering(self, action):
pull_changes = False
with db.session():
u = user.models.User.get_or_create(self.user_id)
user_info = u.info
@ -335,6 +350,8 @@ class Node(Thread):
if 'message' in u.info:
del u.info['message']
u.save()
if action == 'acceptPeering':
pull_changes = True
else:
logger.debug('peering failed? %s %s', action, r)
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
@ -342,6 +359,8 @@ class Node(Thread):
with db.session():
u = user.models.User.get(self.user_id)
trigger_event('peering.%s' % action.replace('Peering', ''), u.json())
if pull_changes:
self.pullChanges()
return True
headers = {
@ -493,7 +512,7 @@ class Nodes(Thread):
self.queue('add', u.id, True)
self.local = LocalNodes()
self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
self._pullcb.start()
state.main.add_callback(self._pullcb.start)
Thread.__init__(self)
self.daemon = True
self.start()
@ -514,9 +533,16 @@ class Nodes(Thread):
def queue(self, *args):
if args:
logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize())
logger.debug('queue "%s", %s entries in queue', args, self._q.qsize())
self._q.put(list(args))
def peer_queue(self, peer, action, *args):
if peer not in self._nodes:
self._add(peer)
elif not self._nodes[peer].is_online():
self._nodes[peer].ping()
self._nodes[peer].queue(action, *args)
def is_online(self, id):
return id in self._nodes and self._nodes[id].is_online()
@ -532,22 +558,27 @@ class Nodes(Thread):
if target == 'all':
nodes = list(self._nodes.values())
elif target == 'peered':
nodes = [n for n in list(self._nodes.values()) if n.user.peered]
ids = []
with db.session():
from user.models import User
for u in User.query.filter(User.id != settings.USER_ID).filter_by(peered=True).all():
ids.append(u.id)
nodes = [n for n in list(self._nodes.values()) if n.user_id in ids]
elif target == 'online':
nodes = [n for n in list(self._nodes.values()) if n.online]
else:
if not target in self._nodes:
if target not in self._nodes:
self._add(target)
nodes = [self._nodes[target]]
for node in nodes:
r = getattr(node, action)(*args)
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
node._call(action, *args)
def _add(self, user_id, send_response=False):
if user_id not in self._nodes:
from user.models import User
with db.session():
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
User.get_or_create(user_id)
self._nodes[user_id] = Node(self, user_id)
else:
self._nodes[user_id].ping()
if send_response:
@ -575,7 +606,7 @@ class Nodes(Thread):
if state.shutdown:
break
node = self._nodes.get(u['id'])
if node:
if node and node.is_online():
node.pullChanges()
self._pulling = False
@ -589,8 +620,6 @@ class Nodes(Thread):
def publish_node():
update_online()
state.check_nodes = PeriodicCallback(check_nodes, 120000)
state.check_nodes.start()
state._online = PeriodicCallback(update_online, 60000)
state._online.start()
@ -605,11 +634,3 @@ def update_online():
if state.online:
for node in list(state.nodes._nodes.values()):
node.trigger_status()
def check_nodes():
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id, True)

View File

@ -128,7 +128,11 @@ class Parser(object):
elif k == 'list':
nickname, name = v.split(':', 1)
if nickname:
u = self._user.query.filter_by(nickname=nickname).one()
try:
u = self._user.query.filter_by(nickname=nickname, peered=True).one()
except sqlalchemy.orm.exc.NoResultFound:
ids = []
return self.in_ids(ids, exclude)
else:
u = self._user.query.filter_by(id=settings.USER_ID).one()
if not name:

View File

@ -95,4 +95,4 @@ FULLTEXT_SUPPORT = fulltext.platform_supported()
if not FULLTEXT_SUPPORT:
config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext']
DB_VERSION = 15
DB_VERSION = 17

View File

@ -43,11 +43,11 @@ CREATE TABLE user (
peered BOOLEAN,
online BOOLEAN,
PRIMARY KEY (id),
UNIQUE (nickname),
CHECK (queued IN (0, 1)),
CHECK (peered IN (0, 1)),
CHECK (online IN (0, 1))
);
CREATE INDEX ix_user_nichname ON user (nichname);
CREATE TABLE metadata (
created DATETIME,
modified DATETIME,

View File

@ -373,6 +373,10 @@ class Update(Thread):
db_version = migrate_13()
if db_version < 15:
db_version = migrate_15()
if db_version < 16:
db_version = migrate_16()
if db_version < 17:
db_version = migrate_17()
settings.server['db_version'] = db_version
def run(self):
@ -437,7 +441,7 @@ def migrate_5():
'DROP INDEX IF EXISTS user_metadata_index',
'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)',
'UPDATE sort SET sharemetadata = 0',
]),
])
with db.session() as session:
import user.models
for m in user.models.Metadata.query:
@ -631,3 +635,42 @@ def migrate_15():
del u.info['local']
u.save()
return 15
def migrate_16():
db.run_sql([
'''CREATE TABLE user2 (
created DATETIME,
modified DATETIME,
id VARCHAR(43) NOT NULL,
info BLOB,
nickname VARCHAR(256),
pending VARCHAR(64),
queued BOOLEAN,
peered BOOLEAN,
online BOOLEAN,
PRIMARY KEY (id),
CHECK (queued IN (0, 1)),
CHECK (peered IN (0, 1)),
CHECK (online IN (0, 1))
)''',
'''INSERT INTO user2 (created, modified, id, info, nickname, pending, queued, peered, online)
SELECT created, modified, id, info, nickname, pending, queued, peered, online FROM user''',
'DROP TABLE user',
'ALTER TABLE user2 RENAME TO user',
'CREATE INDEX IF NOT EXISTS ix_user_nickname ON user (nickname)'
])
return 16
def migrate_17():
from user.models import List, User
from changelog import add_record
with db.session():
l = List.get(':Public')
if not l:
add_record('removelist', 'Public')
lists = []
for l in List.query.filter_by(user_id=settings.USER_ID).order_by('index_'):
if l.type == 'static' and l.name not in ('', 'Inbox'):
lists.append(l.name)
add_record('orderlists', lists)
return 17

View File

@ -422,7 +422,7 @@ def requestPeering(data):
u.info['nickname'] = data.get('nickname', '')
u.update_name()
u.save()
state.nodes.queue(u.id, 'peering', 'requestPeering')
state.nodes.peer_queue(u.id, 'peering', 'requestPeering')
return {}
actions.register(requestPeering, cache=False)
@ -441,7 +441,7 @@ def acceptPeering(data):
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(True)
state.nodes.queue(u.id, 'peering', 'acceptPeering')
state.nodes.peer_queue(u.id, 'peering', 'acceptPeering')
return {}
actions.register(acceptPeering, cache=False)
@ -459,7 +459,7 @@ def rejectPeering(data):
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.queue(u.id, 'peering', 'rejectPeering')
state.nodes.peer_queue(u.id, 'peering', 'rejectPeering')
return {}
actions.register(rejectPeering, cache=False)
@ -474,10 +474,11 @@ def removePeering(data):
if len(data.get('id', '')) not in (16, 43):
logger.debug('invalid user id')
return {}
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.queue(u.id, 'peering', 'removePeering')
u = models.User.get(data['id'], for_udpate=True)
if u:
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.peer_queue(u.id, 'peering', 'removePeering')
return {}
actions.register(removePeering, cache=False)
@ -493,7 +494,7 @@ def cancelPeering(data):
u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '')
u.update_peering(False)
state.nodes.queue(u.id, 'peering', 'cancelPeering')
state.nodes.peer_queue(u.id, 'peering', 'cancelPeering')
return {}
actions.register(cancelPeering, cache=False)

View File

@ -30,7 +30,7 @@ class User(db.Model):
id = sa.Column(sa.String(43), primary_key=True)
info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler)))
nickname = sa.Column(sa.String(256), unique=True)
nickname = sa.Column(sa.String(256), index=True)
pending = sa.Column(sa.String(64)) # sent|received
queued = sa.Column(sa.Boolean())
@ -339,7 +339,7 @@ class List(db.Model):
state.db.session.add(self)
if commit:
state.db.session.commit()
if self.user_id == settings.USER_ID and self.name != '' and available_items:
if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items:
add_record('addlistitems', self.name, available_items)
def get_items(self):
@ -368,7 +368,7 @@ class List(db.Model):
q = list_items.delete().where(list_items.columns['list_id'].is_(self.id))
state.db.session.execute(q)
if not self._query:
if self.user_id == settings.USER_ID and self.name != '':
if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'):
add_record('removelist', self.name)
state.db.session.delete(self)
if commit:

View File

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
from tornado.websocket import WebSocketHandler
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
import json
from oxtornado import json_dumps
@ -49,7 +49,11 @@ class Handler(WebSocketHandler):
#websocket calls
def on_message(self, message):
action, data = json.loads(message)
try:
action, data = json.loads(message)
except json.decoder.JSONDecodeError:
logger.debug('invalid websocket message: %s', message)
return
if state.tasks:
state.tasks.queue(action, data)
@ -62,7 +66,17 @@ class Handler(WebSocketHandler):
if self.ws_connection is None:
self.on_close()
else:
state.main.add_callback(lambda: self.write_message(message))
state.main.add_callback(lambda: self._write_message(message))
async def _write_message(self, message):
try:
task = self.write_message(message)
await task
except StreamClosedError as e:
self.on_close()
except WebSocketClosedError as e:
self.on_close()
def trigger_event(event, data):
#if len(state.websockets):