Compare commits
14 Commits
148b41087f
...
103aab3f4a
Author | SHA1 | Date |
---|---|---|
j | 103aab3f4a | |
j | 5f36b2eab4 | |
j | 9d11bbba53 | |
j | 431dc8e194 | |
j | 4033958341 | |
j | bcfc8ba9b2 | |
j | d84e950ca8 | |
j | de9a172f39 | |
j | c7e316ab22 | |
j | a111aaac46 | |
j | 611fc2b373 | |
j | 265a4791fe | |
j | ad15b8a25a | |
j | c95e22869c |
|
@ -538,7 +538,7 @@ class Item(db.Model):
|
|||
add_record('additem', self.id, f.info)
|
||||
add_record('edititem', self.id, self.meta)
|
||||
for l in self.lists.filter_by(user_id=settings.USER_ID):
|
||||
if l.name != '':
|
||||
if l.name != '' and l.name != 'Inbox':
|
||||
add_record('addlistitems', l.name, [self.id])
|
||||
self.update()
|
||||
f.move()
|
||||
|
|
|
@ -130,16 +130,22 @@ class Peer(object):
|
|||
self.info['lists'][new['name']] = self.info['lists'].pop(name)
|
||||
else:
|
||||
self.info['lists'][new['name']] = []
|
||||
if name in self.info['listorder']:
|
||||
self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']]
|
||||
elif action == 'orderlists':
|
||||
self.info['listorder'] = args[0]
|
||||
elif action == 'removelist':
|
||||
name = args[0]
|
||||
if name in self.info['lists']:
|
||||
del self.info['lists'][name]
|
||||
if name in self.info['listorder']:
|
||||
self.info['listorder'] = [n for n in self.info['listorder'] if n != name]
|
||||
elif action == 'addlistitems':
|
||||
name, ids = args
|
||||
if name not in self.info['lists']:
|
||||
self.info['lists'][name] = []
|
||||
if name not in self.info['listorder']:
|
||||
self.info['listorder'].append(name)
|
||||
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
|
||||
elif action == 'removelistitems':
|
||||
name, ids = args
|
||||
|
@ -208,8 +214,9 @@ class Peer(object):
|
|||
|
||||
def remove(self):
|
||||
self.join()
|
||||
os.unlink(self._dbpath)
|
||||
os.unlink(self._infopath)
|
||||
for path in (self._dbpath, self._logpath, self._infopath):
|
||||
if os.path.exists(path):
|
||||
os.unlink(path)
|
||||
|
||||
def sync_db(self):
|
||||
import item.models
|
||||
|
@ -327,6 +334,8 @@ class Peer(object):
|
|||
if state.shutdown:
|
||||
break
|
||||
state.db.session.commit()
|
||||
if update_items:
|
||||
logger.debug('updated %s items', len(update_items))
|
||||
ids = set(self.library.keys())
|
||||
changed = False
|
||||
for name, l in self.info.get('lists', {}).items():
|
||||
|
|
|
@ -153,5 +153,7 @@ class LocalNodes(dict):
|
|||
state.tasks.queue('removelocalinfo', id)
|
||||
|
||||
def get(self, user_id):
|
||||
if user_id in self and can_connect(self[user_id]):
|
||||
return self[user_id] if user_id in self else None
|
||||
data = super().get(user_id)
|
||||
if data and can_connect(data):
|
||||
return data
|
||||
return None
|
||||
|
|
121
oml/nodes.py
121
oml/nodes.py
|
@ -1,16 +1,17 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from io import BytesIO
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
import json
|
||||
from io import BytesIO
|
||||
import gzip
|
||||
import urllib.request, urllib.error, urllib.parse
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import socket
|
||||
import socks
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
import ox
|
||||
from tornado.ioloop import PeriodicCallback
|
||||
|
@ -37,12 +38,13 @@ class Node(Thread):
|
|||
_online = None
|
||||
TIMEOUT = 5
|
||||
|
||||
def __init__(self, nodes, user):
|
||||
def __init__(self, nodes, user_id):
|
||||
self._nodes = nodes
|
||||
self.user = user
|
||||
self.user_id = user.id
|
||||
self.user_id = user_id
|
||||
self._opener = get_opener(self.user_id)
|
||||
self._q = Queue()
|
||||
self._pingcb = PeriodicCallback(self.ping, 10 * settings.server['pull_interval'])
|
||||
state.main.add_callback(self._pingcb.start)
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
@ -53,10 +55,22 @@ class Node(Thread):
|
|||
action = self._q.get()
|
||||
if state.shutdown:
|
||||
break
|
||||
if action == 'send_response':
|
||||
self._send_response()
|
||||
elif action == 'ping':
|
||||
if action == 'ping':
|
||||
self.online = self.can_connect()
|
||||
elif action == 'send_response':
|
||||
if self.online:
|
||||
self._send_response()
|
||||
else:
|
||||
if not self._q.qsize():
|
||||
time.sleep(5)
|
||||
self.send_response()
|
||||
elif isinstance(action, list) and len(action) == 2:
|
||||
if self.online:
|
||||
self._call(action[0], *action[1])
|
||||
else:
|
||||
if not self._q.qsize():
|
||||
time.sleep(5)
|
||||
self.queue(action[0], *action[1])
|
||||
else:
|
||||
logger.debug('unknown action %s', action)
|
||||
|
||||
|
@ -68,6 +82,14 @@ class Node(Thread):
|
|||
if state.online or self.get_local():
|
||||
self._q.put('ping')
|
||||
|
||||
def queue(self, action, *args):
|
||||
logger.debug('queue node action %s->%s%s', self.user_id, action, args)
|
||||
self._q.put([action, args])
|
||||
|
||||
def _call(self, action, *args):
|
||||
r = getattr(self, action)(*args)
|
||||
logger.debug('call node api %s->%s%s = %s', self.user_id, action, args, r)
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
if self.local:
|
||||
|
@ -202,34 +224,26 @@ class Node(Thread):
|
|||
def _send_response(self):
|
||||
with db.session():
|
||||
u = user.models.User.get(self.user_id)
|
||||
send_response = u and u.peered or u.queued
|
||||
if u:
|
||||
user_pending = u.pending
|
||||
user_peered = u.peered
|
||||
user_queued = u.queued
|
||||
else:
|
||||
user_queued = False
|
||||
if DEBUG_NODES:
|
||||
logger.debug('go online peered=%s queued=%s %s (%s)', u.peered, u.queued, u.id, u.nickname)
|
||||
|
||||
if send_response:
|
||||
try:
|
||||
self.online = self.can_connect()
|
||||
except:
|
||||
if DEBUG_NODES:
|
||||
logger.debug('failed to connect to %s', self.user_id)
|
||||
self.online = False
|
||||
if self.online:
|
||||
if DEBUG_NODES:
|
||||
logger.debug('connected to %s', self.url)
|
||||
if user_queued:
|
||||
if DEBUG_NODES:
|
||||
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
|
||||
if user_pending == 'sent':
|
||||
self.peering('requestPeering')
|
||||
elif user_pending == '' and user_peered:
|
||||
self.peering('acceptPeering')
|
||||
else:
|
||||
#fixme, what about cancel/reject peering here?
|
||||
self.peering('removePeering')
|
||||
if user_queued:
|
||||
if DEBUG_NODES:
|
||||
logger.debug('connected to %s', self.url)
|
||||
logger.debug('queued peering event pending=%s peered=%s', user_pending, user_peered)
|
||||
if user_pending == 'sent':
|
||||
self.peering('requestPeering')
|
||||
elif user_pending == '' and user_peered:
|
||||
self.peering('acceptPeering')
|
||||
else:
|
||||
#fixme, what about cancel/reject peering here?
|
||||
self.peering('removePeering')
|
||||
|
||||
def trigger_status(self):
|
||||
if self.online is not None:
|
||||
|
@ -321,6 +335,7 @@ class Node(Thread):
|
|||
return False
|
||||
|
||||
def peering(self, action):
|
||||
pull_changes = False
|
||||
with db.session():
|
||||
u = user.models.User.get_or_create(self.user_id)
|
||||
user_info = u.info
|
||||
|
@ -335,6 +350,8 @@ class Node(Thread):
|
|||
if 'message' in u.info:
|
||||
del u.info['message']
|
||||
u.save()
|
||||
if action == 'acceptPeering':
|
||||
pull_changes = True
|
||||
else:
|
||||
logger.debug('peering failed? %s %s', action, r)
|
||||
if action in ('cancelPeering', 'rejectPeering', 'removePeering'):
|
||||
|
@ -342,6 +359,8 @@ class Node(Thread):
|
|||
with db.session():
|
||||
u = user.models.User.get(self.user_id)
|
||||
trigger_event('peering.%s' % action.replace('Peering', ''), u.json())
|
||||
if pull_changes:
|
||||
self.pullChanges()
|
||||
return True
|
||||
|
||||
headers = {
|
||||
|
@ -493,7 +512,7 @@ class Nodes(Thread):
|
|||
self.queue('add', u.id, True)
|
||||
self.local = LocalNodes()
|
||||
self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
|
||||
self._pullcb.start()
|
||||
state.main.add_callback(self._pullcb.start)
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.start()
|
||||
|
@ -514,9 +533,16 @@ class Nodes(Thread):
|
|||
|
||||
def queue(self, *args):
|
||||
if args:
|
||||
logger.debug('queue "%s", %s entries in queue', args[0], self._q.qsize())
|
||||
logger.debug('queue "%s", %s entries in queue', args, self._q.qsize())
|
||||
self._q.put(list(args))
|
||||
|
||||
def peer_queue(self, peer, action, *args):
|
||||
if peer not in self._nodes:
|
||||
self._add(peer)
|
||||
elif not self._nodes[peer].is_online():
|
||||
self._nodes[peer].ping()
|
||||
self._nodes[peer].queue(action, *args)
|
||||
|
||||
def is_online(self, id):
|
||||
return id in self._nodes and self._nodes[id].is_online()
|
||||
|
||||
|
@ -532,22 +558,27 @@ class Nodes(Thread):
|
|||
if target == 'all':
|
||||
nodes = list(self._nodes.values())
|
||||
elif target == 'peered':
|
||||
nodes = [n for n in list(self._nodes.values()) if n.user.peered]
|
||||
ids = []
|
||||
with db.session():
|
||||
from user.models import User
|
||||
for u in User.query.filter(User.id != settings.USER_ID).filter_by(peered=True).all():
|
||||
ids.append(u.id)
|
||||
nodes = [n for n in list(self._nodes.values()) if n.user_id in ids]
|
||||
elif target == 'online':
|
||||
nodes = [n for n in list(self._nodes.values()) if n.online]
|
||||
else:
|
||||
if not target in self._nodes:
|
||||
if target not in self._nodes:
|
||||
self._add(target)
|
||||
nodes = [self._nodes[target]]
|
||||
for node in nodes:
|
||||
r = getattr(node, action)(*args)
|
||||
logger.debug('call node api %s->%s%s = %s', node.user_id, action, args, r)
|
||||
node._call(action, *args)
|
||||
|
||||
def _add(self, user_id, send_response=False):
|
||||
if user_id not in self._nodes:
|
||||
from user.models import User
|
||||
with db.session():
|
||||
self._nodes[user_id] = Node(self, User.get_or_create(user_id))
|
||||
User.get_or_create(user_id)
|
||||
self._nodes[user_id] = Node(self, user_id)
|
||||
else:
|
||||
self._nodes[user_id].ping()
|
||||
if send_response:
|
||||
|
@ -575,7 +606,7 @@ class Nodes(Thread):
|
|||
if state.shutdown:
|
||||
break
|
||||
node = self._nodes.get(u['id'])
|
||||
if node:
|
||||
if node and node.is_online():
|
||||
node.pullChanges()
|
||||
self._pulling = False
|
||||
|
||||
|
@ -589,8 +620,6 @@ class Nodes(Thread):
|
|||
|
||||
def publish_node():
|
||||
update_online()
|
||||
state.check_nodes = PeriodicCallback(check_nodes, 120000)
|
||||
state.check_nodes.start()
|
||||
state._online = PeriodicCallback(update_online, 60000)
|
||||
state._online.start()
|
||||
|
||||
|
@ -605,11 +634,3 @@ def update_online():
|
|||
if state.online:
|
||||
for node in list(state.nodes._nodes.values()):
|
||||
node.trigger_status()
|
||||
|
||||
def check_nodes():
|
||||
if state.online:
|
||||
with db.session():
|
||||
for u in user.models.User.query.filter_by(queued=True):
|
||||
if not state.nodes.is_online(u.id):
|
||||
logger.debug('queued peering message for %s trying to connect...', u.id)
|
||||
state.nodes.queue('add', u.id, True)
|
||||
|
|
|
@ -128,7 +128,11 @@ class Parser(object):
|
|||
elif k == 'list':
|
||||
nickname, name = v.split(':', 1)
|
||||
if nickname:
|
||||
u = self._user.query.filter_by(nickname=nickname).one()
|
||||
try:
|
||||
u = self._user.query.filter_by(nickname=nickname, peered=True).one()
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
ids = []
|
||||
return self.in_ids(ids, exclude)
|
||||
else:
|
||||
u = self._user.query.filter_by(id=settings.USER_ID).one()
|
||||
if not name:
|
||||
|
|
|
@ -95,4 +95,4 @@ FULLTEXT_SUPPORT = fulltext.platform_supported()
|
|||
if not FULLTEXT_SUPPORT:
|
||||
config['itemKeys'] = [k for k in config['itemKeys'] if k['id'] != 'fulltext']
|
||||
|
||||
DB_VERSION = 15
|
||||
DB_VERSION = 17
|
||||
|
|
|
@ -43,11 +43,11 @@ CREATE TABLE user (
|
|||
peered BOOLEAN,
|
||||
online BOOLEAN,
|
||||
PRIMARY KEY (id),
|
||||
UNIQUE (nickname),
|
||||
CHECK (queued IN (0, 1)),
|
||||
CHECK (peered IN (0, 1)),
|
||||
CHECK (online IN (0, 1))
|
||||
);
|
||||
CREATE INDEX ix_user_nichname ON user (nichname);
|
||||
CREATE TABLE metadata (
|
||||
created DATETIME,
|
||||
modified DATETIME,
|
||||
|
|
|
@ -373,6 +373,10 @@ class Update(Thread):
|
|||
db_version = migrate_13()
|
||||
if db_version < 15:
|
||||
db_version = migrate_15()
|
||||
if db_version < 16:
|
||||
db_version = migrate_16()
|
||||
if db_version < 17:
|
||||
db_version = migrate_17()
|
||||
settings.server['db_version'] = db_version
|
||||
|
||||
def run(self):
|
||||
|
@ -437,7 +441,7 @@ def migrate_5():
|
|||
'DROP INDEX IF EXISTS user_metadata_index',
|
||||
'CREATE UNIQUE INDEX user_metadata_index ON user_metadata(item_id, user_id)',
|
||||
'UPDATE sort SET sharemetadata = 0',
|
||||
]),
|
||||
])
|
||||
with db.session() as session:
|
||||
import user.models
|
||||
for m in user.models.Metadata.query:
|
||||
|
@ -631,3 +635,42 @@ def migrate_15():
|
|||
del u.info['local']
|
||||
u.save()
|
||||
return 15
|
||||
|
||||
def migrate_16():
|
||||
db.run_sql([
|
||||
'''CREATE TABLE user2 (
|
||||
created DATETIME,
|
||||
modified DATETIME,
|
||||
id VARCHAR(43) NOT NULL,
|
||||
info BLOB,
|
||||
nickname VARCHAR(256),
|
||||
pending VARCHAR(64),
|
||||
queued BOOLEAN,
|
||||
peered BOOLEAN,
|
||||
online BOOLEAN,
|
||||
PRIMARY KEY (id),
|
||||
CHECK (queued IN (0, 1)),
|
||||
CHECK (peered IN (0, 1)),
|
||||
CHECK (online IN (0, 1))
|
||||
)''',
|
||||
'''INSERT INTO user2 (created, modified, id, info, nickname, pending, queued, peered, online)
|
||||
SELECT created, modified, id, info, nickname, pending, queued, peered, online FROM user''',
|
||||
'DROP TABLE user',
|
||||
'ALTER TABLE user2 RENAME TO user',
|
||||
'CREATE INDEX IF NOT EXISTS ix_user_nickname ON user (nickname)'
|
||||
])
|
||||
return 16
|
||||
|
||||
def migrate_17():
|
||||
from user.models import List, User
|
||||
from changelog import add_record
|
||||
with db.session():
|
||||
l = List.get(':Public')
|
||||
if not l:
|
||||
add_record('removelist', 'Public')
|
||||
lists = []
|
||||
for l in List.query.filter_by(user_id=settings.USER_ID).order_by('index_'):
|
||||
if l.type == 'static' and l.name not in ('', 'Inbox'):
|
||||
lists.append(l.name)
|
||||
add_record('orderlists', lists)
|
||||
return 17
|
||||
|
|
|
@ -422,7 +422,7 @@ def requestPeering(data):
|
|||
u.info['nickname'] = data.get('nickname', '')
|
||||
u.update_name()
|
||||
u.save()
|
||||
state.nodes.queue(u.id, 'peering', 'requestPeering')
|
||||
state.nodes.peer_queue(u.id, 'peering', 'requestPeering')
|
||||
return {}
|
||||
actions.register(requestPeering, cache=False)
|
||||
|
||||
|
@ -441,7 +441,7 @@ def acceptPeering(data):
|
|||
u = models.User.get_or_create(data['id'])
|
||||
u.info['message'] = data.get('message', '')
|
||||
u.update_peering(True)
|
||||
state.nodes.queue(u.id, 'peering', 'acceptPeering')
|
||||
state.nodes.peer_queue(u.id, 'peering', 'acceptPeering')
|
||||
return {}
|
||||
actions.register(acceptPeering, cache=False)
|
||||
|
||||
|
@ -459,7 +459,7 @@ def rejectPeering(data):
|
|||
u = models.User.get_or_create(data['id'])
|
||||
u.info['message'] = data.get('message', '')
|
||||
u.update_peering(False)
|
||||
state.nodes.queue(u.id, 'peering', 'rejectPeering')
|
||||
state.nodes.peer_queue(u.id, 'peering', 'rejectPeering')
|
||||
return {}
|
||||
actions.register(rejectPeering, cache=False)
|
||||
|
||||
|
@ -474,10 +474,11 @@ def removePeering(data):
|
|||
if len(data.get('id', '')) not in (16, 43):
|
||||
logger.debug('invalid user id')
|
||||
return {}
|
||||
u = models.User.get_or_create(data['id'])
|
||||
u.info['message'] = data.get('message', '')
|
||||
u.update_peering(False)
|
||||
state.nodes.queue(u.id, 'peering', 'removePeering')
|
||||
u = models.User.get(data['id'], for_udpate=True)
|
||||
if u:
|
||||
u.info['message'] = data.get('message', '')
|
||||
u.update_peering(False)
|
||||
state.nodes.peer_queue(u.id, 'peering', 'removePeering')
|
||||
return {}
|
||||
actions.register(removePeering, cache=False)
|
||||
|
||||
|
@ -493,7 +494,7 @@ def cancelPeering(data):
|
|||
u = models.User.get_or_create(data['id'])
|
||||
u.info['message'] = data.get('message', '')
|
||||
u.update_peering(False)
|
||||
state.nodes.queue(u.id, 'peering', 'cancelPeering')
|
||||
state.nodes.peer_queue(u.id, 'peering', 'cancelPeering')
|
||||
return {}
|
||||
actions.register(cancelPeering, cache=False)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ class User(db.Model):
|
|||
id = sa.Column(sa.String(43), primary_key=True)
|
||||
info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler)))
|
||||
|
||||
nickname = sa.Column(sa.String(256), unique=True)
|
||||
nickname = sa.Column(sa.String(256), index=True)
|
||||
|
||||
pending = sa.Column(sa.String(64)) # sent|received
|
||||
queued = sa.Column(sa.Boolean())
|
||||
|
@ -339,7 +339,7 @@ class List(db.Model):
|
|||
state.db.session.add(self)
|
||||
if commit:
|
||||
state.db.session.commit()
|
||||
if self.user_id == settings.USER_ID and self.name != '' and available_items:
|
||||
if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items:
|
||||
add_record('addlistitems', self.name, available_items)
|
||||
|
||||
def get_items(self):
|
||||
|
@ -368,7 +368,7 @@ class List(db.Model):
|
|||
q = list_items.delete().where(list_items.columns['list_id'].is_(self.id))
|
||||
state.db.session.execute(q)
|
||||
if not self._query:
|
||||
if self.user_id == settings.USER_ID and self.name != '':
|
||||
if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'):
|
||||
add_record('removelist', self.name)
|
||||
state.db.session.delete(self)
|
||||
if commit:
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from tornado.websocket import WebSocketHandler
|
||||
from tornado.websocket import WebSocketHandler, WebSocketClosedError
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.iostream import StreamClosedError
|
||||
import json
|
||||
|
||||
from oxtornado import json_dumps
|
||||
|
@ -49,7 +49,11 @@ class Handler(WebSocketHandler):
|
|||
|
||||
#websocket calls
|
||||
def on_message(self, message):
|
||||
action, data = json.loads(message)
|
||||
try:
|
||||
action, data = json.loads(message)
|
||||
except json.decoder.JSONDecodeError:
|
||||
logger.debug('invalid websocket message: %s', message)
|
||||
return
|
||||
if state.tasks:
|
||||
state.tasks.queue(action, data)
|
||||
|
||||
|
@ -62,7 +66,17 @@ class Handler(WebSocketHandler):
|
|||
if self.ws_connection is None:
|
||||
self.on_close()
|
||||
else:
|
||||
state.main.add_callback(lambda: self.write_message(message))
|
||||
state.main.add_callback(lambda: self._write_message(message))
|
||||
|
||||
async def _write_message(self, message):
|
||||
try:
|
||||
task = self.write_message(message)
|
||||
await task
|
||||
except StreamClosedError as e:
|
||||
self.on_close()
|
||||
except WebSocketClosedError as e:
|
||||
self.on_close()
|
||||
|
||||
|
||||
def trigger_event(event, data):
|
||||
#if len(state.websockets):
|
||||
|
|
Loading…
Reference in New Issue