# -*- coding: utf-8 -*- import json import os import time import ox from sqlitedict import SqliteDict import db import settings import state import utils from websocket import trigger_event import logging logger = logging.getLogger(__name__) COMMIT_TIMEOUT = 20 def maybe_commit(t0): if time.time() - t0 > COMMIT_TIMEOUT: state.db.session.commit() t0 = time.time() return t0 class Peer(object): def __init__(self, id): base = os.path.join(settings.data_path, 'peers') ox.makedirs(base) self._dbpath = os.path.join(base, '%s.db' % id) self._logpath = os.path.join(base, '%s.log' % id) self._infopath = os.path.join(base, '%s.json' % id) self.id = id self.library = SqliteDict(self._dbpath, tablename='library', autocommit=False) if os.path.exists(self._infopath): with open(self._infopath) as f: self.info = json.load(f) else: self.info = {} if 'peers' not in self.info: self.info['peers'] = {} if 'lists' not in self.info: self.info['lists'] = {} for name in self.info['lists']: if 'listorder' not in self.info: self.info['listorder'] = [] if name not in self.info['listorder']: self.info['listorder'].append(name) def apply_log(self): changes = [] if os.path.exists(self._logpath): with open(self._logpath, 'r', encoding='utf-8', newline='\n') as fd: for line in fd: if line: try: revision = int(line.split(',', 1)[0][1:]) if revision <= self.info.get('revision', -1): continue data = json.loads(line) except: logger.debug('failed to parse line: %s', line) return if data[0] <= self.info.get('revision', -1): continue changes.append(data) if changes: self.apply_changes(changes) def apply_changes(self, changes): r = True for change in changes: if state.shutdown: r = False break if not self.apply_change(change): logger.debug('FAIL %s', change) r = False break self.library.commit() self.sync_info() self.sync_db() trigger_event('change', {}) return r def apply_change(self, change): revision, timestamp, data = change action = data[0] args = data[1:] #logger.debug('apply change: %s(%s)', action, args) if action == 'additem': itemid, info = args item = self.library.get(itemid, {}) item['info'] = info item['modified'] = utils.ts2datetime(timestamp) self.library[itemid] = item # trigger additem elif action == 'edititem': itemid, meta = args item = self.library.get(itemid, {}) if 'meta' not in item: item['meta'] = meta else: item['meta'].update(meta) item['meta_hash'] = utils.get_meta_hash(item['meta']) item['modified'] = utils.ts2datetime(timestamp) self.library[itemid] = item if state.tasks: state.tasks.queue('syncmetadata', [itemid]) state.tasks.queue('getpreview', itemid) elif action == 'removeitem': itemid = args[0] if itemid in self.library: del self.library[itemid] elif action == 'addlist': name = args[0] if len(args) > 1: query = args[1] if name not in self.info['lists']: self.info['lists'][name] = [] if 'listorder' not in self.info: self.info['listorder'] = [] if name not in self.info['listorder']: self.info['listorder'].append(name) elif action == 'editlist': name, new = args if name in self.info['lists']: self.info['lists'][new['name']] = self.info['lists'].pop(name) else: self.info['lists'][new['name']] = [] if name in self.info['listorder']: self.info['listorder'] = [new['name'] if n == name else n for n in self.info['listorder']] elif action == 'orderlists': self.info['listorder'] = args[0] elif action == 'removelist': name = args[0] if name in self.info['lists']: del self.info['lists'][name] if name in self.info['listorder']: self.info['listorder'] = [n for n in self.info['listorder'] if n != name] elif action == 'addlistitems': name, ids = args if name not in self.info['lists']: self.info['lists'][name] = [] if name not in self.info['listorder']: self.info['listorder'].append(name) self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids)) elif action == 'removelistitems': name, ids = args if name in self.info['lists']: self.info['lists'][name] = list(set(self.info['lists'][name]) - set(ids)) elif action == 'addpeer': peerid, username = args if len(peerid) == 16: self.info['peers'][peerid] = {'username': username} # fixme, just trigger peer update here from user.models import User peer = User.get_or_create(peerid) if 'username' not in peer.info: peer.info['username'] = username peer.update_name() peer.save() elif action == 'editpeer': peerid, data = args if len(peerid) == 16: if peerid not in self.info['peers']: self.info['peers'][peerid] = {} for key in ('username', 'contact'): if key in data: self.info['peers'][peerid][key] = data[key] # fixme, just trigger peer update here from user.models import User peer = User.get_or_create(peerid) update = False if not peer.peered: for key in ('username', 'contact'): if key in data and peer.info.get(key) != data[key]: peer.info[key] = data[key] update = True if update: peer.save() elif action == 'removepeer': peerid = args[0] if peerid in self.info['peers']: del self.info['peers'][peerid] # trigger peer update elif action == 'editusername': self.info['username'] = args[0] elif action == 'editcontact': self.info['contact'] = args[0] elif action == 'addannotation': from annotation.models import Annotation if len(args) == 2: itemid, data = args Annotation.create(item_id=itemid, user_id=self.id, data=data) else: logger.error('invalid entry %s %s', action, args) elif action == 'editannotation': from annotation.models import Annotation if len(args) == 3: itemid, annotationid, data = args a = Annotation.get(self.id, itemid, annotationid) if a: for key in data: a.data[key] = data[key] a.save() else: logger.error('invalid entry %s %s', action, args) elif action == 'removeannotation': from annotation.models import Annotation if len(args) == 2: itemid, annotationid = args a = Annotation.get(self.id, itemid, annotationid) if a: a.delete() else: logger.error('invalid entry %s %s', action, args) else: logger.debug('UNKNOWN ACTION:', action) self.info['revision'] = revision return True def get_metahash(self, itemid): item = self.library[itemid] if 'meta_hash' not in item: item['meta_hash'] = utils.get_meta_hash(item['meta']) self.library[itemid] = item return item['meta_hash'] def sync_info(self): with open(self._infopath, 'w') as fd: json.dump(self.info, fd, indent=4, sort_keys=True) def join(self): #self.library.join() self.library.commit() self.library.close(do_log=False) self.sync_info() def remove(self): self.join() for path in (self._dbpath, self._logpath, self._infopath): if os.path.exists(path): os.unlink(path) def sync_db(self): import item.models import user.models c_user_id = item.models.user_items.columns['user_id'] c_item_id = item.models.user_items.columns['item_id'] l_list_id = user.models.list_items.columns['list_id'] l_item_id = user.models.list_items.columns['item_id'] q = item.models.user_items.select().where(c_user_id.is_(self.id)) current = set([r[1] for r in state.db.session.execute(q)]) library = set(self.library.keys()) remove = list(current - library) u = user.models.User.get(self.id) listid = u.library.id lists = [l.id for l in u.lists.all()] getpreview = [] t0 = time.time() if remove: logger.debug('remove %s items from %s', len(remove), self.id) q = item.models.user_items.delete().where(c_user_id.is_(self.id)).where(c_item_id.in_(remove)) state.db.session.execute(q) q = user.models.list_items.delete().where(l_list_id.in_(lists)).where(l_item_id.in_(remove)) state.db.session.execute(q) add = list(library - current) listitems = {} q = user.models.list_items.select().where(l_list_id.in_(lists)) for row in state.db.session.execute(q): if not row['list_id'] in listitems: listitems[row['list_id']] = set() listitems[row['list_id']].add(row['item_id']) t0 = maybe_commit(t0) if add: logger.debug('add %s items from %s', len(add), self.id) t0 = time.time() q = item.models.user_items.select().where(c_user_id.is_(self.id)) useritems = {r['item_id'] for r in state.db.session.execute(q)} for itemid in add: i = item.models.Item.get(itemid) if not i: i = item.models.Item(id=itemid) m = self.library.get(itemid, {}) i.info.update(m.get('info', {})) i.meta = m.get('meta', {}) state.db.session.add(i) getpreview.append(itemid) if itemid not in useritems: q = item.models.user_items.insert({'item_id': itemid, 'user_id': self.id}) state.db.session.execute(q) if itemid not in listitems.get(listid, []): q = user.models.list_items.insert({'item_id': itemid, 'list_id': listid}) state.db.session.execute(q) if state.shutdown: break t0 = maybe_commit(t0) state.db.session.commit() if remove: q = item.models.user_items.select() user_items = {i['item_id'] for i in state.db.session.execute(q)} removed_items = set(remove)-user_items if removed_items: item.models.Item.remove_many(removed_items) state.db.session.commit() if state.shutdown: return if state.tasks: for itemid in getpreview: state.tasks.queue('getpreview', itemid) update_items = remove + add current_lists = set(l.name for l in u.lists.all() if l.name) add = list(set(self.info['lists']) - current_lists) remove = list(current_lists - set(self.info['lists'])) t0 = time.time() for l in u.lists.all(): if l.name: if l.name in remove: logger.debug('remove list %s', l.name) l.remove(commit=False) else: if l.id in listitems: ladd = list(set(self.info['lists'][l.name]) - set(listitems[l.id])) lremove = list(set(listitems[l.id]) - set(self.info['lists'][l.name])) if ladd: l.add_items(ladd, commit=False) update_items = list(set(update_items) - set(ladd)) if lremove: l.remove_items(lremove, commit=False) if ladd or lremove: logger.debug('update list %s', l.name) elif self.info['lists'][l.name]: l.add_items(self.info['lists'][l.name], commit=False) update_items = list(set(update_items) - set(self.info['lists'][l.name])) logger.debug('update list %s', l.name) if state.shutdown: break t0 = maybe_commit(t0) state.db.session.commit() if state.shutdown: return if add: for name in add: logger.debug('add list %s', name) l = user.models.List.get_or_create(self.id, name) l.add_items(self.info['lists'][name], commit=False) trigger_event('addlist', {'id': l.public_id, 'user': self.id}) update_items = list(set(update_items) - set(self.info['lists'][name])) if state.shutdown: break t0 = time.time() if update_items: logger.debug('update %s items', len(update_items)) for i in item.models.Item.query.filter(item.models.Item.id.in_(update_items)): i.update(commit=False) t0 = maybe_commit(t0) if state.shutdown: break state.db.session.commit() if update_items: logger.debug('updated %s items', len(update_items)) ids = set(self.library.keys()) changed = False for name, l in self.info.get('lists', {}).items(): removed = set(l) - ids if removed: self.info['lists'][name] = list(set(l) - removed) changed = True if changed: self.sync_info() def sync_db(): from sqlalchemy.orm import load_only import item.models first = True missing_previews = [] state.sync_db = True #FIXME: why is this loop needed with db.session(): sort_ids = {i.item_id for i in item.models.Sort.query.options(load_only('item_id'))} if sort_ids: t0 = time.time() commit = False for i in item.models.Item.query.options(load_only('id')): if i.id not in sort_ids: if first: first = False logger.debug('sync items') #why? #i.update(commit=False) i.update_sort(commit=False) if i.info.get('mediastate') == 'unavailable': missing_previews.append(i.id) commit = True #logger.debug('sync:%s', i) t0 = maybe_commit(t0) if state.shutdown: break if commit: state.db.session.commit() if not first: logger.debug('synced items') if not state.shutdown: cleanup_lists() logger.debug('lists cleaned up') if not state.shutdown: cleanup_peers() logger.debug('peers cleaned up') item.models.Sort.query.filter_by(item_id=None).delete() item.models.Find.query.filter_by(item_id=None).delete() if missing_previews and state.tasks: logger.debug('queueing download of %s missing previews', len(missing_previews)) for id in missing_previews: state.tasks.queue('getpreview', id) state.sync_db = False def cleanup_lists(): import item.models import user.models with db.session(): for l in user.models.List.query.all(): if not l.user: l.remove() elif not l.user.peered and not l.user.id == settings.USER_ID: l.remove() peers = [u.id for u in user.models.User.query.filter_by(peered=True)] + [settings.USER_ID] q = item.models.user_items.delete().where(item.models.user_items.columns['user_id'].notin_(peers)) state.db.session.execute(q) lists = [l.id for l in user.models.List.query.all()] q = user.models.list_items.delete().where(user.models.list_items.columns['list_id'].notin_(lists)) state.db.session.execute(q) state.db.session.commit() item.models.Item.remove_without_user() state.db.session.commit() def cleanup_peers(): import user.models other_peers = {} peers = [u for u in user.models.User.query.filter_by(peered=True)] peers.sort(key=lambda u: utils.user_sort_key(u.json())) known_peers = set() known_peers.add(settings.USER_ID) for u in peers: known_peers.add(u.id) peer = utils.get_peer(u.id) for id in peer.info.get('peers', {}): if id not in other_peers: other_peers[id] = peer.info['peers'][id] known_peers.add(id) for u in user.models.User.query.filter(user.models.User.id.notin_(list(known_peers))): if state.nodes and u.id in state.nodes.local: continue if not u.pending: state.db.session.delete(u) state.db.session.commit()