467 lines
18 KiB
Python
467 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
import os
|
|
import time
|
|
|
|
import ox
|
|
from sqlitedict import SqliteDict
|
|
|
|
import db
|
|
import settings
|
|
import state
|
|
import utils
|
|
from websocket import trigger_event
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
COMMIT_TIMEOUT = 20
|
|
|
|
def maybe_commit(t0):
|
|
if time.time() - t0 > COMMIT_TIMEOUT:
|
|
state.db.session.commit()
|
|
t0 = time.time()
|
|
return t0
|
|
|
|
class Peer(object):
|
|
def __init__(self, id):
|
|
base = os.path.join(settings.data_path, 'peers')
|
|
ox.makedirs(base)
|
|
self._dbpath = os.path.join(base, '%s.db' % id)
|
|
self._logpath = os.path.join(base, '%s.log' % id)
|
|
self._infopath = os.path.join(base, '%s.json' % id)
|
|
|
|
self.id = id
|
|
self.library = SqliteDict(self._dbpath, tablename='library', autocommit=False)
|
|
|
|
if os.path.exists(self._infopath):
|
|
with open(self._infopath) as f:
|
|
self.info = json.load(f)
|
|
else:
|
|
self.info = {}
|
|
if 'peers' not in self.info:
|
|
self.info['peers'] = {}
|
|
if 'lists' not in self.info:
|
|
self.info['lists'] = {}
|
|
for name in self.info['lists']:
|
|
if 'listorder' not in self.info:
|
|
self.info['listorder'] = []
|
|
if name not in self.info['listorder']:
|
|
self.info['listorder'].append(name)
|
|
|
|
def apply_log(self):
|
|
changes = []
|
|
if os.path.exists(self._logpath):
|
|
with open(self._logpath, 'r', encoding='utf-8', newline='\n') as fd:
|
|
for line in fd:
|
|
if line:
|
|
try:
|
|
revision = int(line.split(',', 1)[0][1:])
|
|
if revision <= self.info.get('revision', -1):
|
|
continue
|
|
data = json.loads(line)
|
|
except:
|
|
logger.debug('failed to parse line: %s', line)
|
|
return
|
|
if data[0] <= self.info.get('revision', -1):
|
|
continue
|
|
changes.append(data)
|
|
if changes:
|
|
self.apply_changes(changes)
|
|
|
|
def apply_changes(self, changes):
|
|
r = True
|
|
for change in changes:
|
|
if state.shutdown:
|
|
r = False
|
|
break
|
|
if not self.apply_change(change):
|
|
logger.debug('FAIL %s', change)
|
|
r = False
|
|
break
|
|
self.library.commit()
|
|
self.sync_info()
|
|
self.sync_db()
|
|
trigger_event('change', {})
|
|
return r
|
|
|
|
def apply_change(self, change):
|
|
revision, timestamp, data = change
|
|
action = data[0]
|
|
args = data[1:]
|
|
#logger.debug('apply change: %s(%s)', action, args)
|
|
if action == 'additem':
|
|
itemid, info = args
|
|
item = self.library.get(itemid, {})
|
|
item['info'] = info
|
|
item['modified'] = utils.ts2datetime(timestamp)
|
|
self.library[itemid] = item
|
|
# trigger additem
|
|
elif action == 'edititem':
|
|
itemid, meta = args
|
|
item = self.library.get(itemid, {})
|
|
if 'meta' not in item:
|
|
item['meta'] = meta
|
|
else:
|
|
item['meta'].update(meta)
|
|
item['meta_hash'] = utils.get_meta_hash(item['meta'])
|
|
item['modified'] = utils.ts2datetime(timestamp)
|
|
self.library[itemid] = item
|
|
if state.tasks:
|
|
state.tasks.queue('syncmetadata', [itemid])
|
|
state.tasks.queue('getpreview', itemid)
|
|
elif action == 'removeitem':
|
|
itemid = args[0]
|
|
if itemid in self.library:
|
|
del self.library[itemid]
|
|
elif action == 'addlist':
|
|
name = args[0]
|
|
if len(args) > 1:
|
|
query = args[1]
|
|
if name not in self.info['lists']:
|
|
self.info['lists'][name] = []
|
|
if 'listorder' not in self.info:
|
|
self.info['listorder'] = []
|
|
if name not in self.info['listorder']:
|
|
self.info['listorder'].append(name)
|
|
elif action == 'editlist':
|
|
name, new = args
|
|
if name in self.info['lists']:
|
|
self.info['lists'][new['name']] = self.info['lists'].pop(name)
|
|
else:
|
|
self.info['lists'][new['name']] = []
|
|
if name in self.info['listorder']:
|
|
self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']]
|
|
elif action == 'orderlists':
|
|
self.info['listorder'] = args[0]
|
|
elif action == 'removelist':
|
|
name = args[0]
|
|
if name in self.info['lists']:
|
|
del self.info['lists'][name]
|
|
if name in self.info['listorder']:
|
|
self.info['listorder'] = [n for n in self.info['listorder'] if n != name]
|
|
elif action == 'addlistitems':
|
|
name, ids = args
|
|
if name not in self.info['lists']:
|
|
self.info['lists'][name] = []
|
|
if name not in self.info['listorder']:
|
|
self.info['listorder'].append(name)
|
|
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
|
|
elif action == 'removelistitems':
|
|
name, ids = args
|
|
if name in self.info['lists']:
|
|
self.info['lists'][name] = list(set(self.info['lists'][name]) - set(ids))
|
|
elif action == 'addpeer':
|
|
peerid, username = args
|
|
if len(peerid) == settings.ID_LENGTH:
|
|
self.info['peers'][peerid] = {'username': username}
|
|
# fixme, just trigger peer update here
|
|
from user.models import User
|
|
peer = User.get_or_create(peerid)
|
|
if 'username' not in peer.info:
|
|
peer.info['username'] = username
|
|
peer.update_name()
|
|
peer.save()
|
|
elif action == 'editpeer':
|
|
peerid, data = args
|
|
if len(peerid) == settings.ID_LENGTH:
|
|
if peerid not in self.info['peers']:
|
|
self.info['peers'][peerid] = {}
|
|
for key in ('username', 'contact'):
|
|
if key in data:
|
|
self.info['peers'][peerid][key] = data[key]
|
|
# fixme, just trigger peer update here
|
|
from user.models import User
|
|
peer = User.get_or_create(peerid)
|
|
update = False
|
|
if not peer.peered:
|
|
for key in ('username', 'contact'):
|
|
if key in data and peer.info.get(key) != data[key]:
|
|
peer.info[key] = data[key]
|
|
update = True
|
|
if update:
|
|
peer.save()
|
|
elif action == 'removepeer':
|
|
peerid = args[0]
|
|
if peerid in self.info['peers']:
|
|
del self.info['peers'][peerid]
|
|
# trigger peer update
|
|
elif action == 'editusername':
|
|
self.info['username'] = args[0]
|
|
elif action == 'editcontact':
|
|
self.info['contact'] = args[0]
|
|
elif action == 'addannotation':
|
|
from annotation.models import Annotation
|
|
if len(args) == 2:
|
|
itemid, data = args
|
|
Annotation.create(item_id=itemid, user_id=self.id, data=data)
|
|
else:
|
|
logger.error('invalid entry %s %s', action, args)
|
|
elif action == 'editannotation':
|
|
from annotation.models import Annotation
|
|
if len(args) == 3:
|
|
itemid, annotationid, data = args
|
|
a = Annotation.get(self.id, itemid, annotationid)
|
|
if a:
|
|
for key in data:
|
|
a.data[key] = data[key]
|
|
a.save()
|
|
else:
|
|
logger.error('invalid entry %s %s', action, args)
|
|
elif action == 'removeannotation':
|
|
from annotation.models import Annotation
|
|
if len(args) == 2:
|
|
itemid, annotationid = args
|
|
a = Annotation.get(self.id, itemid, annotationid)
|
|
if a:
|
|
a.delete()
|
|
else:
|
|
logger.error('invalid entry %s %s', action, args)
|
|
else:
|
|
logger.debug('UNKNOWN ACTION:', action)
|
|
self.info['revision'] = revision
|
|
return True
|
|
|
|
def get_metahash(self, itemid):
|
|
item = self.library[itemid]
|
|
if 'meta_hash' not in item:
|
|
item['meta_hash'] = utils.get_meta_hash(item['meta'])
|
|
self.library[itemid] = item
|
|
return item['meta_hash']
|
|
|
|
def sync_info(self):
|
|
with open(self._infopath, 'w') as fd:
|
|
json.dump(self.info, fd, indent=4, sort_keys=True)
|
|
|
|
def join(self):
|
|
#self.library.join()
|
|
self.library.commit()
|
|
self.library.close(do_log=False)
|
|
self.sync_info()
|
|
|
|
def remove(self):
|
|
self.join()
|
|
for path in (self._dbpath, self._logpath, self._infopath):
|
|
if os.path.exists(path):
|
|
os.unlink(path)
|
|
|
|
def sync_db(self):
|
|
import item.models
|
|
import user.models
|
|
c_user_id = item.models.user_items.columns['user_id']
|
|
c_item_id = item.models.user_items.columns['item_id']
|
|
l_list_id = user.models.list_items.columns['list_id']
|
|
l_item_id = user.models.list_items.columns['item_id']
|
|
q = item.models.user_items.select().where(c_user_id.is_(self.id))
|
|
current = set([r[1] for r in state.db.session.execute(q)])
|
|
library = set(self.library.keys())
|
|
remove = list(current - library)
|
|
u = user.models.User.get(self.id)
|
|
listid = u.library.id
|
|
lists = [l.id for l in u.lists.all()]
|
|
getpreview = []
|
|
t0 = time.time()
|
|
if remove:
|
|
logger.debug('remove %s items from %s', len(remove), self.id)
|
|
q = item.models.user_items.delete().where(c_user_id.is_(self.id)).where(c_item_id.in_(remove))
|
|
state.db.session.execute(q)
|
|
q = user.models.list_items.delete().where(l_list_id.in_(lists)).where(l_item_id.in_(remove))
|
|
state.db.session.execute(q)
|
|
add = list(library - current)
|
|
listitems = {}
|
|
q = user.models.list_items.select().where(l_list_id.in_(lists))
|
|
for row in state.db.session.execute(q):
|
|
if not row['list_id'] in listitems:
|
|
listitems[row['list_id']] = set()
|
|
listitems[row['list_id']].add(row['item_id'])
|
|
t0 = maybe_commit(t0)
|
|
if add:
|
|
logger.debug('add %s items from %s', len(add), self.id)
|
|
t0 = time.time()
|
|
q = item.models.user_items.select().where(c_user_id.is_(self.id))
|
|
useritems = {r['item_id'] for r in state.db.session.execute(q)}
|
|
for itemid in add:
|
|
i = item.models.Item.get(itemid)
|
|
if not i:
|
|
i = item.models.Item(id=itemid)
|
|
m = self.library.get(itemid, {})
|
|
i.info.update(m.get('info', {}))
|
|
i.meta = m.get('meta', {})
|
|
state.db.session.add(i)
|
|
getpreview.append(itemid)
|
|
if itemid not in useritems:
|
|
q = item.models.user_items.insert({'item_id': itemid, 'user_id': self.id})
|
|
state.db.session.execute(q)
|
|
if itemid not in listitems.get(listid, []):
|
|
q = user.models.list_items.insert({'item_id': itemid, 'list_id': listid})
|
|
state.db.session.execute(q)
|
|
if state.shutdown:
|
|
break
|
|
t0 = maybe_commit(t0)
|
|
state.db.session.commit()
|
|
if remove:
|
|
q = item.models.user_items.select()
|
|
user_items = {i['item_id'] for i in state.db.session.execute(q)}
|
|
removed_items = set(remove)-user_items
|
|
if removed_items:
|
|
item.models.Item.remove_many(removed_items)
|
|
state.db.session.commit()
|
|
if state.shutdown:
|
|
return
|
|
if state.tasks:
|
|
for itemid in getpreview:
|
|
state.tasks.queue('getpreview', itemid)
|
|
update_items = remove + add
|
|
|
|
current_lists = set(l.name for l in u.lists.all() if l.name)
|
|
add = list(set(self.info['lists']) - current_lists)
|
|
remove = list(current_lists - set(self.info['lists']))
|
|
t0 = time.time()
|
|
for l in u.lists.all():
|
|
if l.name:
|
|
if l.name in remove:
|
|
logger.debug('remove list %s', l.name)
|
|
l.remove(commit=False)
|
|
else:
|
|
if l.id in listitems:
|
|
ladd = list(set(self.info['lists'][l.name]) - set(listitems[l.id]))
|
|
lremove = list(set(listitems[l.id]) - set(self.info['lists'][l.name]))
|
|
if ladd:
|
|
l.add_items(ladd, commit=False)
|
|
update_items = list(set(update_items) - set(ladd))
|
|
if lremove:
|
|
l.remove_items(lremove, commit=False)
|
|
if ladd or lremove:
|
|
logger.debug('update list %s', l.name)
|
|
elif self.info['lists'][l.name]:
|
|
l.add_items(self.info['lists'][l.name], commit=False)
|
|
update_items = list(set(update_items) - set(self.info['lists'][l.name]))
|
|
logger.debug('update list %s', l.name)
|
|
if state.shutdown:
|
|
break
|
|
t0 = maybe_commit(t0)
|
|
state.db.session.commit()
|
|
if state.shutdown:
|
|
return
|
|
if add:
|
|
for name in add:
|
|
logger.debug('add list %s', name)
|
|
l = user.models.List.get_or_create(self.id, name)
|
|
l.add_items(self.info['lists'][name], commit=False)
|
|
trigger_event('addlist', {'id': l.public_id, 'user': self.id})
|
|
update_items = list(set(update_items) - set(self.info['lists'][name]))
|
|
if state.shutdown:
|
|
break
|
|
t0 = time.time()
|
|
if update_items:
|
|
logger.debug('update %s items', len(update_items))
|
|
for i in item.models.Item.query.filter(item.models.Item.id.in_(update_items)):
|
|
i.update(commit=False)
|
|
t0 = maybe_commit(t0)
|
|
if state.shutdown:
|
|
break
|
|
state.db.session.commit()
|
|
if update_items:
|
|
logger.debug('updated %s items', len(update_items))
|
|
ids = set(self.library.keys())
|
|
changed = False
|
|
for name, l in self.info.get('lists', {}).items():
|
|
removed = set(l) - ids
|
|
if removed:
|
|
self.info['lists'][name] = list(set(l) - removed)
|
|
changed = True
|
|
if changed:
|
|
self.sync_info()
|
|
|
|
def sync_db():
|
|
from sqlalchemy.orm import load_only
|
|
import item.models
|
|
first = True
|
|
missing_previews = []
|
|
state.sync_db = True
|
|
|
|
#FIXME: why is this loop needed
|
|
with db.session():
|
|
sort_ids = {i.item_id for i in item.models.Sort.query.options(load_only('item_id'))}
|
|
if sort_ids:
|
|
t0 = time.time()
|
|
commit = False
|
|
for i in item.models.Item.query.options(load_only('id')):
|
|
if i.id not in sort_ids:
|
|
if first:
|
|
first = False
|
|
logger.debug('sync items')
|
|
#why?
|
|
#i.update(commit=False)
|
|
i.update_sort(commit=False)
|
|
if i.info.get('mediastate') == 'unavailable':
|
|
missing_previews.append(i.id)
|
|
commit = True
|
|
#logger.debug('sync:%s', i)
|
|
t0 = maybe_commit(t0)
|
|
if state.shutdown:
|
|
break
|
|
if commit:
|
|
state.db.session.commit()
|
|
|
|
if not first:
|
|
logger.debug('synced items')
|
|
if not state.shutdown:
|
|
cleanup_lists()
|
|
logger.debug('lists cleaned up')
|
|
if not state.shutdown:
|
|
cleanup_peers()
|
|
logger.debug('peers cleaned up')
|
|
item.models.Sort.query.filter_by(item_id=None).delete()
|
|
item.models.Find.query.filter_by(item_id=None).delete()
|
|
|
|
if missing_previews and state.tasks:
|
|
logger.debug('queueing download of %s missing previews', len(missing_previews))
|
|
for id in missing_previews:
|
|
state.tasks.queue('getpreview', id)
|
|
state.sync_db = False
|
|
|
|
def cleanup_lists():
|
|
import item.models
|
|
import user.models
|
|
with db.session():
|
|
for l in user.models.List.query.all():
|
|
if not l.user:
|
|
l.remove()
|
|
elif not l.user.peered and not l.user.id == settings.USER_ID:
|
|
l.remove()
|
|
|
|
peers = [u.id for u in user.models.User.query.filter_by(peered=True)] + [settings.USER_ID]
|
|
q = item.models.user_items.delete().where(item.models.user_items.columns['user_id'].notin_(peers))
|
|
state.db.session.execute(q)
|
|
|
|
lists = [l.id for l in user.models.List.query.all()]
|
|
q = user.models.list_items.delete().where(user.models.list_items.columns['list_id'].notin_(lists))
|
|
state.db.session.execute(q)
|
|
|
|
state.db.session.commit()
|
|
|
|
item.models.Item.remove_without_user()
|
|
state.db.session.commit()
|
|
|
|
def cleanup_peers():
|
|
import user.models
|
|
other_peers = {}
|
|
peers = [u for u in user.models.User.query.filter_by(peered=True)]
|
|
peers.sort(key=lambda u: utils.user_sort_key(u.json()))
|
|
known_peers = set()
|
|
known_peers.add(settings.USER_ID)
|
|
for u in peers:
|
|
known_peers.add(u.id)
|
|
peer = utils.get_peer(u.id)
|
|
for id in peer.info.get('peers', {}):
|
|
if id not in other_peers:
|
|
other_peers[id] = peer.info['peers'][id]
|
|
known_peers.add(id)
|
|
for u in user.models.User.query.filter(user.models.User.id.notin_(list(known_peers))):
|
|
if state.nodes and u.id in state.nodes.local:
|
|
continue
|
|
if not u.pending:
|
|
state.db.session.delete(u)
|
|
|
|
state.db.session.commit()
|