Compare commits

..

No commits in common. "103aab3f4a09207e25e2e86daee6d912e27adedd" and "148b41087fcff4097c7e3ee38a8c3de8ce41e260" have entirely different histories.

11 changed files with 74 additions and 168 deletions

View file

@ -538,7 +538,7 @@ class Item(db.Model):
add_record('additem', self.id, f.info) add_record('additem', self.id, f.info)
add_record('edititem', self.id, self.meta) add_record('edititem', self.id, self.meta)
for l in self.lists.filter_by(user_id=settings.USER_ID): for l in self.lists.filter_by(user_id=settings.USER_ID):
if l.name != '' and l.name != 'Inbox': if l.name != '':
add_record('addlistitems', l.name, [self.id]) add_record('addlistitems', l.name, [self.id])
self.update() self.update()
f.move() f.move()

View file

@ -130,22 +130,16 @@ class Peer(object):
self.info['lists'][new['name']] = self.info['lists'].pop(name) self.info['lists'][new['name']] = self.info['lists'].pop(name)
else: else:
self.info['lists'][new['name']] = [] self.info['lists'][new['name']] = []
if name in self.info['listorder']:
self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']]
elif action == 'orderlists': elif action == 'orderlists':
self.info['listorder'] = args[0] self.info['listorder'] = args[0]
elif action == 'removelist': elif action == 'removelist':
name = args[0] name = args[0]
if name in self.info['lists']: if name in self.info['lists']:
del self.info['lists'][name] del self.info['lists'][name]
if name in self.info['listorder']:
self.info['listorder'] = [n for n in self.info['listorder'] if n != name]
elif action == 'addlistitems': elif action == 'addlistitems':
name, ids = args name, ids = args
if name not in self.info['lists']: if name not in self.info['lists']:
self.info['lists'][name] = [] self.info['lists'][name] = []
if name not in self.info['listorder']:
self.info['listorder'].append(name)
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids)) self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
elif action == 'removelistitems': elif action == 'removelistitems':
name, ids = args name, ids = args
@ -214,9 +208,8 @@ class Peer(object):
def remove(self): def remove(self):
self.join() self.join()
for path in (self._dbpath, self._logpath, self._infopath): os.unlink(self._dbpath)
if os.path.exists(path): os.unlink(self._infopath)
os.unlink(path)
def sync_db(self): def sync_db(self):
import item.models import item.models
@ -334,8 +327,6 @@ class Peer(object):
if state.shutdown: if state.shutdown:
break break
state.db.session.commit() state.db.session.commit()
if update_items:
logger.debug('updated %s items', len(update_items))
ids = set(self.library.keys()) ids = set(self.library.keys())
changed = False changed = False
for name, l in self.info.get('lists', {}).items(): for name, l in self.info.get('lists', {}).items():

View file

@ -153,7 +153,5 @@ class LocalNodes(dict):
state.tasks.queue('removelocalinfo', id) state.tasks.queue('removelocalinfo', id)
def get(self, user_id): def get(self, user_id):
data = super().get(user_id) if user_id in self and can_connect(self[user_id]):
if data and can_connect(data): return self[user_id] if user_id in self else None
return data
return None

View file

@ -1,17 +1,16 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from io import BytesIO
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
import gzip
import json import json
from io import BytesIO
import gzip
import urllib.request, urllib.error, urllib.parse
import os import os
import time
import socket import socket
import socks import socks
import time
import urllib.error
import urllib.parse
import urllib.request
import ox import ox
from tornado.ioloop import PeriodicCallback from tornado.ioloop import PeriodicCallback
@ -38,13 +37,12 @@ class Node(Thread):
_online = None _online = None
TIMEOUT = 5 TIMEOUT = 5
def __init__(self, nodes, user_id): def __init__(self, nodes, user):
self._nodes = nodes self._nodes = nodes
self.user_id = user_id self.user = user
self.user_id = user.id
self._opener = get_opener(self.user_id) self._opener = get_opener(self.user_id)
self._q = Queue() self._q = Queue()
self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval'])
state.main.add_callback(self._pingcb.start)
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
@ -55,22 +53,10 @@ class Node(Thread):
action = self._q.get() action = self._q.get()
if state.shutdown: if state.shutdown:
break break
if action == 'ping': if action == 'send_response':
self._send_response()
elif action == 'ping':
self.online = self.can_connect() self.online = self.can_connect()
elif action == 'send_response':
if self.online:
self._send_response()
else:
if not self._q.qsize():
time.sleep(5)
self.send_response()
elif isinstance(action, list) and len(action) == 2:
if self.online:
self._call(action[0], *action[1])
else:
if not self._q.qsize():
time.sleep(5)
self.queue(action[0], *action[1])
else: else:
logger.debug('unknown action %s', action) logger.debug('unknown action %s', action)
@ -82,14 +68,6 @@ class Node(Thread):
if state.online or self.get_local(): if state.online or self.get_local():
self._q.put('ping') self._q.put('ping')
def queue(self, action, *args):
logger.debug('queue node action %s->%s%s', self.user_id, action, args)
self._q.put([action, args])
def _call(self, action, *args):
r = getattr(self, action)(*args)
logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r)
@property @property
def url(self): def url(self):
if self.local: if self.local:
@ -224,26 +202,34 @@ class Node(Thread):
def _send_response(self): def _send_response(self):
with db.session(): with db.session():
u = user.models.User.get(self.user_id) u = user.models.User.get(self.user_id)
send_response = u and u.peered or u.queued
if u: if u:
user_pending = u.pending user_pending = u.pending
user_peered = u.peered user_peered = u.peered
user_queued = u.queued user_queued = u.queued
else:
user_queued = False
if DEBUG_NODES: if DEBUG_NODES:
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname) logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
if user_queued: if send_response:
if DEBUG_NODES: try:
logger.debug('connected to %s', self.url) self.online = self.can_connect()
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered) except:
if user_pending == 'sent': if DEBUG_NODES:
self.peering('requestPeering') logger.debug('failed to connect to %s', self.user_id)
elif user_pending == '' and user_peered: self.online = False
self.peering('acceptPeering') if self.online:
else: if DEBUG_NODES:
#fixme, what about cancel/reject peering here? logger.debug('connected to %s', self.url)
self.peering('removePeering') if user_queued:
if DEBUG_NODES:
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
if user_pending == 'sent':
self.peering('requestPeering')
elif user_pending == '' and user_peered:
self.peering('acceptPeering')
else:
#fixme, what about cancel/reject peering here?
self.peering('removePeering')
def trigger_status(self): def trigger_status(self):
if self.online is not None: if self.online is not None:
@ -335,7 +321,6 @@ class Node(Thread):
return False return False
def peering(self, action): def peering(self, action):
pull_changes = False
with db.session(): with db.session():
u = user.models.User.get_or_create(self.user_id) u = user.models.User.get_or_create(self.user_id)
user_info = u.info user_info = u.info
@ -350,8 +335,6 @@ class Node(Thread):
if 'message' in u.info: if 'message' in u.info:
del u.info['message'] del u.info['message']
u.save() u.save()
if action == 'acceptPeering':
pull_changes = True
else: else:
logger.debug('peering failed? %s %s', action, r) logger.debug('peering failed? %s %s', action, r)
if action in ('cancelPeering', 'rejectPeering', 'removePeering'): if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
@ -359,8 +342,6 @@ class Node(Thread):
with db.session(): with db.session():
u = user.models.User.get(self.user_id) u = user.models.User.get(self.user_id)
trigger_event('peering.%s' % action.replace('Peering', ''), u.json()) trigger_event('peering.%s' % action.replace('Peering', ''), u.json())
if pull_changes:
self.pullChanges()
return True return True
headers = { headers = {
@ -512,7 +493,7 @@ class Nodes(Thread):
self.queue('add', u.id, True) self.queue('add', u.id, True)
self.local = LocalNodes() self.local = LocalNodes()
self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval']) self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
state.main.add_callback(self._pullcb.start) self._pullcb.start()
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
@ -533,16 +514,9 @@ class Nodes(Thread):
def queue(self, *args): def queue(self, *args):
if args: if args:
logger.debug('queue "%s", %s entries in queue', args, self._q.qsize()) logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize())
self._q.put(list(args)) self._q.put(list(args))
def peer_queue(self, peer, action, *args):
if peer not in self._nodes:
self._add(peer)
elif not self._nodes[peer].is_online():
self._nodes[peer].ping()
self._nodes[peer].queue(action, *args)
def is_online(self, id): def is_online(self, id):
return id in self._nodes and self._nodes[id].is_online() return id in self._nodes and self._nodes[id].is_online()
@ -558,27 +532,22 @@ class Nodes(Thread):
if target == 'all': if target == 'all':
nodes = list(self._nodes.values()) nodes = list(self._nodes.values())
elif target == 'peered': elif target == 'peered':
ids = [] nodes = [n for n in list(self._nodes.values()) if n.user.peered]
with db.session():
from user.models import User
for u in User.query.filter(User.id != settings.USER_ID).filter_by(peered=True).all():
ids.append(u.id)
nodes = [n for n in list(self._nodes.values()) if n.user_id in ids]
elif target == 'online': elif target == 'online':
nodes = [n for n in list(self._nodes.values()) if n.online] nodes = [n for n in list(self._nodes.values()) if n.online]
else: else:
if target not in self._nodes: if not target in self._nodes:
self._add(target) self._add(target)
nodes = [self._nodes[target]] nodes = [self._nodes[target]]
for node in nodes: for node in nodes:
node._call(action, *args) r = getattr(node, action)(*args)
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
def _add(self, user_id, send_response=False): def _add(self, user_id, send_response=False):
if user_id not in self._nodes: if user_id not in self._nodes:
from user.models import User from user.models import User
with db.session(): with db.session():
User.get_or_create(user_id) self._nodes[user_id] = Node(self, User.get_or_create(user_id))
self._nodes[user_id] = Node(self, user_id)
else: else:
self._nodes[user_id].ping() self._nodes[user_id].ping()
if send_response: if send_response:
@ -606,7 +575,7 @@ class Nodes(Thread):
if state.shutdown: if state.shutdown:
break break
node = self._nodes.get(u['id']) node = self._nodes.get(u['id'])
if node and node.is_online(): if node:
node.pullChanges() node.pullChanges()
self._pulling = False self._pulling = False
@ -620,6 +589,8 @@ class Nodes(Thread):
def publish_node(): def publish_node():
update_online() update_online()
state.check_nodes = PeriodicCallback(check_nodes, 120000)
state.check_nodes.start()
state._online = PeriodicCallback(update_online, 60000) state._online = PeriodicCallback(update_online, 60000)
state._online.start() state._online.start()
@ -634,3 +605,11 @@ def update_online():
if state.online: if state.online:
for node in list(state.nodes._nodes.values()): for node in list(state.nodes._nodes.values()):
node.trigger_status() node.trigger_status()
def check_nodes():
if state.online:
with db.session():
for u in user.models.User.query.filter_by(queued=True):
if not state.nodes.is_online(u.id):
logger.debug('queued peering message for %s trying to connect...', u.id)
state.nodes.queue('add', u.id, True)

View file

@ -128,11 +128,7 @@ class Parser(object):
elif k == 'list': elif k == 'list':
nickname, name = v.split(':', 1) nickname, name = v.split(':', 1)
if nickname: if nickname:
try: u = self._user.query.filter_by(nickname=nickname).one()
u = self._user.query.filter_by(nickname=nickname, peered=True).one()
except sqlalchemy.orm.exc.NoResultFound:
ids = []
return self.in_ids(ids, exclude)
else: else:
u = self._user.query.filter_by(id=settings.USER_ID).one() u = self._user.query.filter_by(id=settings.USER_ID).one()
if not name: if not name:

View file

@ -95,4 +95,4 @@ FULLTEXT_SUPPORT = fulltext.platform_supported()
if not FULLTEXT_SUPPORT: if not FULLTEXT_SUPPORT:
config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext'] config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext']
DB_VERSION = 17 DB_VERSION = 15

View file

@ -43,11 +43,11 @@ CREATE TABLE user (
peered BOOLEAN, peered BOOLEAN,
online BOOLEAN, online BOOLEAN,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE (nickname),
CHECK (queued IN (0, 1)), CHECK (queued IN (0, 1)),
CHECK (peered IN (0, 1)), CHECK (peered IN (0, 1)),
CHECK (online IN (0, 1)) CHECK (online IN (0, 1))
); );
CREATE INDEX ix_user_nichname ON user (nichname);
CREATE TABLE metadata ( CREATE TABLE metadata (
created DATETIME, created DATETIME,
modified DATETIME, modified DATETIME,

View file

@ -373,10 +373,6 @@ class Update(Thread):
db_version = migrate_13() db_version = migrate_13()
if db_version < 15: if db_version < 15:
db_version = migrate_15() db_version = migrate_15()
if db_version < 16:
db_version = migrate_16()
if db_version < 17:
db_version = migrate_17()
settings.server['db_version'] = db_version settings.server['db_version'] = db_version
def run(self): def run(self):
@ -441,7 +437,7 @@ def migrate_5():
'DROP INDEX IF EXISTS user_metadata_index', 'DROP INDEX IF EXISTS user_metadata_index',
'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)', 'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)',
'UPDATE sort SET sharemetadata = 0', 'UPDATE sort SET sharemetadata = 0',
]) ]),
with db.session() as session: with db.session() as session:
import user.models import user.models
for m in user.models.Metadata.query: for m in user.models.Metadata.query:
@ -635,42 +631,3 @@ def migrate_15():
del u.info['local'] del u.info['local']
u.save() u.save()
return 15 return 15
def migrate_16():
db.run_sql([
'''CREATE TABLE user2 (
created DATETIME,
modified DATETIME,
id VARCHAR(43) NOT NULL,
info BLOB,
nickname VARCHAR(256),
pending VARCHAR(64),
queued BOOLEAN,
peered BOOLEAN,
online BOOLEAN,
PRIMARY KEY (id),
CHECK (queued IN (0, 1)),
CHECK (peered IN (0, 1)),
CHECK (online IN (0, 1))
)''',
'''INSERT INTO user2 (created, modified, id, info, nickname, pending, queued, peered, online)
SELECT created, modified, id, info, nickname, pending, queued, peered, online FROM user''',
'DROP TABLE user',
'ALTER TABLE user2 RENAME TO user',
'CREATE INDEX IF NOT EXISTS ix_user_nickname ON user (nickname)'
])
return 16
def migrate_17():
from user.models import List, User
from changelog import add_record
with db.session():
l = List.get(':Public')
if not l:
add_record('removelist', 'Public')
lists = []
for l in List.query.filter_by(user_id=settings.USER_ID).order_by('index_'):
if l.type == 'static' and l.name not in ('', 'Inbox'):
lists.append(l.name)
add_record('orderlists', lists)
return 17

View file

@ -422,7 +422,7 @@ def requestPeering(data):
u.info['nickname'] = data.get('nickname', '') u.info['nickname'] = data.get('nickname', '')
u.update_name() u.update_name()
u.save() u.save()
state.nodes.peer_queue(u.id, 'peering', 'requestPeering') state.nodes.queue(u.id, 'peering', 'requestPeering')
return {} return {}
actions.register(requestPeering, cache=False) actions.register(requestPeering, cache=False)
@ -441,7 +441,7 @@ def acceptPeering(data):
u = models.User.get_or_create(data['id']) u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '') u.info['message'] = data.get('message', '')
u.update_peering(True) u.update_peering(True)
state.nodes.peer_queue(u.id, 'peering', 'acceptPeering') state.nodes.queue(u.id, 'peering', 'acceptPeering')
return {} return {}
actions.register(acceptPeering, cache=False) actions.register(acceptPeering, cache=False)
@ -459,7 +459,7 @@ def rejectPeering(data):
u = models.User.get_or_create(data['id']) u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '') u.info['message'] = data.get('message', '')
u.update_peering(False) u.update_peering(False)
state.nodes.peer_queue(u.id, 'peering', 'rejectPeering') state.nodes.queue(u.id, 'peering', 'rejectPeering')
return {} return {}
actions.register(rejectPeering, cache=False) actions.register(rejectPeering, cache=False)
@ -474,11 +474,10 @@ def removePeering(data):
if len(data.get('id', '')) not in (16, 43): if len(data.get('id', '')) not in (16, 43):
logger.debug('invalid user id') logger.debug('invalid user id')
return {} return {}
u = models.User.get(data['id'], for_udpate=True) u = models.User.get_or_create(data['id'])
if u: u.info['message'] = data.get('message', '')
u.info['message'] = data.get('message', '') u.update_peering(False)
u.update_peering(False) state.nodes.queue(u.id, 'peering', 'removePeering')
state.nodes.peer_queue(u.id, 'peering', 'removePeering')
return {} return {}
actions.register(removePeering, cache=False) actions.register(removePeering, cache=False)
@ -494,7 +493,7 @@ def cancelPeering(data):
u = models.User.get_or_create(data['id']) u = models.User.get_or_create(data['id'])
u.info['message'] = data.get('message', '') u.info['message'] = data.get('message', '')
u.update_peering(False) u.update_peering(False)
state.nodes.peer_queue(u.id, 'peering', 'cancelPeering') state.nodes.queue(u.id, 'peering', 'cancelPeering')
return {} return {}
actions.register(cancelPeering, cache=False) actions.register(cancelPeering, cache=False)

View file

@ -30,7 +30,7 @@ class User(db.Model):
id = sa.Column(sa.String(43), primary_key=True) id = sa.Column(sa.String(43), primary_key=True)
info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler)))
nickname = sa.Column(sa.String(256), index=True) nickname = sa.Column(sa.String(256), unique=True)
pending = sa.Column(sa.String(64)) # sent|received pending = sa.Column(sa.String(64)) # sent|received
queued = sa.Column(sa.Boolean()) queued = sa.Column(sa.Boolean())
@ -339,7 +339,7 @@ class List(db.Model):
state.db.session.add(self) state.db.session.add(self)
if commit: if commit:
state.db.session.commit() state.db.session.commit()
if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items: if self.user_id == settings.USER_ID and self.name != '' and available_items:
add_record('addlistitems', self.name, available_items) add_record('addlistitems', self.name, available_items)
def get_items(self): def get_items(self):
@ -368,7 +368,7 @@ class List(db.Model):
q = list_items.delete().where(list_items.columns['list_id'].is_(self.id)) q = list_items.delete().where(list_items.columns['list_id'].is_(self.id))
state.db.session.execute(q) state.db.session.execute(q)
if not self._query: if not self._query:
if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'): if self.user_id == settings.USER_ID and self.name != '':
add_record('removelist', self.name) add_record('removelist', self.name)
state.db.session.delete(self) state.db.session.delete(self)
if commit: if commit:

View file

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
import json import json
from oxtornado import json_dumps from oxtornado import json_dumps
@ -49,11 +49,7 @@ class Handler(WebSocketHandler):
#websocket calls #websocket calls
def on_message(self, message): def on_message(self, message):
try: action, data = json.loads(message)
action, data = json.loads(message)
except json.decoder.JSONDecodeError:
logger.debug('invalid websocket message: %s', message)
return
if state.tasks: if state.tasks:
state.tasks.queue(action, data) state.tasks.queue(action, data)
@ -66,17 +62,7 @@ class Handler(WebSocketHandler):
if self.ws_connection is None: if self.ws_connection is None:
self.on_close() self.on_close()
else: else:
state.main.add_callback(lambda: self._write_message(message)) state.main.add_callback(lambda: self.write_message(message))
async def _write_message(self, message):
try:
task = self.write_message(message)
await task
except StreamClosedError as e:
self.on_close()
except WebSocketClosedError as e:
self.on_close()
def trigger_event(event, data): def trigger_event(event, data):
#if len(state.websockets): #if len(state.websockets):