# -*- coding: utf-8 -*- from datetime import datetime import json import os import shutil import ox from sqlalchemy.orm import load_only import sqlalchemy as sa from changelog import add_record from db import MutableDict import db import json_pickler import settings import state import utils import media from websocket import trigger_event import logging logger = logging.getLogger(__name__) class User(db.Model): __tablename__ = 'user' created = sa.Column(sa.DateTime()) modified = sa.Column(sa.DateTime()) id = sa.Column(sa.String(43), primary_key=True) info = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) nickname = sa.Column(sa.String(256), index=True) pending = sa.Column(sa.String(64)) # sent|received queued = sa.Column(sa.Boolean()) peered = sa.Column(sa.Boolean()) online = sa.Column(sa.Boolean()) def __repr__(self): return self.id @classmethod def get(cls, id, for_update=False): qs = cls.query.filter_by(id=id) if for_update: qs = qs.with_for_update() user = qs.first() if user and not user.info: user.info = {} return user @classmethod def get_or_create(cls, id): user = cls.get(id) if not user: user = cls(id=id, peered=False, online=False) user.info = {} if state.nodes and state.nodes.local and id in state.nodes.local: user.info['username'] = state.nodes.local[id]['username'] user.update_name() user.save() return user def save(self): state.db.session.add(self) state.db.session.commit() @property def name(self): name = self.nickname if self.id != settings.USER_ID else '' return name @property def library(self): l = List.get_or_create(self.id, '') if l.index_ != -1: l.index_ = -1 l.save() return l def json(self, keys=None): j = {} if self.info: j.update(self.info) if state.nodes and self.id in state.nodes.local: j['local'] = state.nodes.local[self.id].copy() j['id'] = self.id if self.pending: j['pending'] = self.pending j['peered'] = self.peered if not keys or 'online' in keys: j['online'] = self.is_online() j['name'] = self.name if not keys or 'username' in keys or 'contact' in keys: if self.id == settings.USER_ID: j['username'] = settings.preferences['username'] j['contact'] = settings.preferences['contact'] elif self.id in state.peers: peer = state.peers[self.id] for key in ('username', 'contact'): if key in peer.info: j[key] = peer.info[key] if keys: for k in set(j) - set(keys): del j[k] return j def export_library(self): old_path = os.path.join(os.path.expanduser(settings.preferences['libraryPath']), 'Books', 'library.json') if os.path.exists(old_path): os.unlink(old_path) path = os.path.join(settings.data_path, 'library.json') self.library.export_json(path) def is_online(self): return state.nodes and state.nodes.is_online(self.id) def trigger_status(self): trigger_event('status', { 'id': self.id, 'online': self.is_online() }) def lists_json(self): self.library if self.id != settings.USER_ID: peer = utils.get_peer(self.id) lists = [] lists.append({ 'id': self.nickname + ':', 'user': self.name, 'items': len(peer.library), 'name': 'Library', 'type': 'library' }) index = 0 for name in peer.info.get('listorder', peer.info.get('lists', []).keys()): lists.append({ 'id': '%s:%s' % (self.nickname, name), 'user': self.name, 'name': name, 'index': index, 'items': len(peer.info['lists'].get(name, [])), 'type': 'static' }) index += 1 return lists return [l.json() for l in self.lists.order_by('index_')] def update_peering(self, peered, username=None): was_peering = self.peered if peered: logging.debug('update_peering, pending: %s queued: %s', self.pending, self.queued) self.queued = self.pending != 'sent' self.pending = '' if username: self.info['username'] = username self.update_name() if not was_peering: add_record('addpeer', self.id, self.nickname) if 'index' not in self.info: self.info['index'] = max([ u.info.get('index', -1) for u in User.query.filter_by(peered=True) if u.id != self.id ] + [0]) + 1 self.peered = True self.save() if self.id in state.removepeer: del state.removepeer[self.id] else: self.pending = '' self.peered = False self.queued = False if 'index' in self.info: del self.info['index'] self.update_name() self.save() if self.name in settings.ui['showFolder']: del settings.ui['showFolder'][self.name] settings.ui._save() state.removepeer[self.id] = True self.cleanup() if was_peering: add_record('removepeer', self.id) self.save() def cleanup(self): from item.models import user_items, Item List.query.filter_by(user_id=self.id).delete() c_user_id = user_items.columns['user_id'] q = user_items.delete().where(c_user_id.is_(self.id)) state.db.session.execute(q) Item.remove_without_user() self.save() if self.id in state.peers: state.peers[self.id].remove() del state.peers[self.id] def update_name(self): if self.id == settings.USER_ID: name = settings.preferences.get('username', 'anonymous') else: name = self.info.get('nickname') or self.info.get('username') or 'anonymous' nickname = name n = 2 while self.query.filter_by(nickname=nickname).filter(User.id != self.id).first(): nickname = '%s [%d]' % (name, n) n += 1 self.nickname = nickname def rebuild_changelog(self): logger.error('no longer used') return Changelog.query.filter_by(user_id=self.id).delete() for item in self.library.get_items().order_by('created'): Changelog.record(self, 'additem', item.id, item.info, _commit=False) Changelog.record(self, 'edititem', item.id, item.meta, _commit=False) lists = [] for l in List.query.filter_by(user_id=self.id, type='static').order_by('index_'): if l.name and l.name != 'Inbox': lists.append(l.name) Changelog.record(self, 'addlist', l.name, _commit=False) items = [i.id for i in l.get_items().options(load_only('id'))] if items: Changelog.record(self, 'addlistitems', l.name, items, _commit=False) if len(lists) > 1: Changelog.record(self, 'orderlists', lists, _commit=False) for peer in User.query.filter_by(peered=True): Changelog.record(self, 'addpeer', peer.id, self.nickname, _commit=False) if peer.info.get('contact'): Changelog.record(self, 'editpeer', peer.id, { 'contact': peer.info.get('contact') }, _commit=False) if settings.preferences.get('contact'): Changelog.record(self, 'editcontact', settings.preferences.get('contact'), _commit=False) state.db.session.commit() list_items = sa.Table('listitem', db.metadata, sa.Column('list_id', sa.Integer(), sa.ForeignKey('list.id')), sa.Column('item_id', sa.String(32), sa.ForeignKey('item.id'))) class List(db.Model): __tablename__ = 'list' id = sa.Column(sa.Integer(), primary_key=True) name = sa.Column(sa.String()) index_ = sa.Column(sa.Integer()) type = sa.Column(sa.String(64)) _query = sa.Column('query', MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) user_id = sa.Column(sa.String(43), sa.ForeignKey('user.id')) user = sa.orm.relationship('User', backref=sa.orm.backref('lists', lazy='dynamic')) items = sa.orm.relationship('Item', secondary=list_items, backref=sa.orm.backref('lists', lazy='dynamic')) @classmethod def get(cls, user_id, name=None): if name is None: user_id, name = cls.get_user_name(user_id) return cls.query.filter_by(user_id=user_id, name=name).first() @classmethod def get_user_name(cls, user_id): nickname, name = user_id.split(':', 1) if nickname: user = User.query.filter_by(nickname=nickname).first() user_id = user.id else: user_id = settings.USER_ID return user_id, name @classmethod def get_or_create(cls, user_id, name=None, query=None): if name is None: user_id, name = cls.get_user_name(user_id) l = cls.get(user_id, name) if not l: l = cls.create(user_id, name, query) return l @classmethod def create(cls, user_id, name, query=None): prefix = name n = 2 while cls.get(user_id, name): name = '%s [%s]' % (prefix, n) n += 1 l = cls(user_id=user_id, name=name) if query: l._query = query l.type = 'smart' else: l.type = 'static' l.index_ = cls.query.filter_by(user_id=user_id).count() state.db.session.add(l) state.db.session.commit() if user_id == settings.USER_ID: if l.type == 'static' and name != '' and name != 'Inbox': add_record('addlist', l.name) return l @classmethod def rename_user(cls, old, new): for l in cls.query.filter(cls._query != None): def update_conditions(conditions): changed = False for c in conditions: if 'conditions' in c: changed = update_conditions(c['conditions']) else: if c.get('key') == 'list' and c.get('value', '').startswith('%s:' % old): c['value'] = '%s:%s' % new, c['value'].split(':', 1)[1] changed = True return changed if l._query and update_conditions(l._query.get('conditions', [])): l.save() def add_items(self, items, commit=True): from item.models import Item available_items = [] for item_id in items: i = Item.get(item_id) if i: if self.user_id == settings.USER_ID and i.info.get('mediastate') != 'available': i.queue_download() if i not in self.items: self.items.append(i) i.update(commit=False) if i.info['mediastate'] == 'available': available_items.append(item_id) state.db.session.add(self) if commit: state.db.session.commit() if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox') and available_items: add_record('addlistitems', self.name, available_items) def get_items(self): from item.models import Item if self.type == 'smart': return Item.find({'query': self._query}) else: return self.user.items.join(Item.lists, aliased=True).filter(List.id == self.id) def remove_items(self, items, commit=True): from item.models import Item for item_id in items: i = Item.get(item_id) if i: if i in self.items: self.items.remove(i) i.update(commit=commit) state.db.session.add(self) if commit: state.db.session.commit() if self.user_id == settings.USER_ID and self.name != '': add_record('removelistitems', self.name, items) def remove(self, commit=True): if not self._query: q = list_items.delete().where(list_items.columns['list_id'].is_(self.id)) state.db.session.execute(q) if not self._query: if self.user_id == settings.USER_ID and self.name not in ('', 'Inbox'): add_record('removelist', self.name) state.db.session.delete(self) if commit: state.db.session.commit() @property def public_id(self): id = '' if self.user_id != settings.USER_ID: id += self.user.nickname id = '%s:%s' % (id, self.name) return id @property def find_id(self): id = '' if self.user_id != settings.USER_ID: id += self.user_id id = '%s:%s' % (id, self.id) return id def __repr__(self): return self.public_id def items_count(self): if self.user_id != settings.USER_ID: peer = utils.get_peer(self.user_id) if self.name: return len(peer.info['lists'].get(self.name, [])) else: return len(peer.library) return self.get_items().count() def json(self): r = { 'id': self.public_id, 'user': self.user.name, 'name': self.name, 'index': self.index_, 'items': self.items_count(), 'type': self.type } if self.name == '': r['name'] = 'Library' r['type'] = 'library' del r['index'] if self.type == 'smart': r['query'] = self._query return r def save(self): state.db.session.add(self) state.db.session.commit() def create_symlinks(self): pass def export_json(self, path=None): from utils import _to_json if not path: if self.name: name = os.path.join('Lists', self.name) else: name = 'Books' path = os.path.join(os.path.expanduser(settings.preferences['libraryPath']), name, 'library.json') ox.makedirs(os.path.dirname(path)) items = [] for i in self.get_items(): j = i.json() for f in i.files: j['path'] = f.path break items.append(j) with open(path, 'w', encoding='utf-8') as f: json.dump(items, f, indent=4, default=_to_json, ensure_ascii=False, sort_keys=True) class Metadata(db.Model): __tablename__ = 'user_metadata' created = sa.Column(sa.DateTime()) modified = sa.Column(sa.DateTime()) id = sa.Column(sa.Integer(), primary_key=True) item_id = sa.Column(sa.String(32)) user_id = sa.Column(sa.String(43), sa.ForeignKey('user.id')) data_hash = sa.Column(sa.String(40), index=True) data = sa.Column(MutableDict.as_mutable(sa.PickleType(pickler=json_pickler))) def __repr__(self): return '{item}/{user}'.format(item=self.item_id, user=self.user_id) @property def timestamp(self): return utils.datetime2ts(self.modified) @classmethod def get(cls, user_id, item_id): return cls.query.filter_by(item_id=item_id, user_id=user_id).first() @classmethod def get_or_create(cls, user_id, item_id, data=None, commit=True): m = cls.get(user_id=user_id, item_id=item_id) if not m: m = cls(user_id=user_id, item_id=item_id) m.created = datetime.utcnow() if data: m.data = data else: m.data = {} m.save(commit=commit) elif data: m.edit(data, commit=commit) return m def get_hash(self): return utils.get_meta_hash(self.data) def save(self, commit=True, modified=None): if modified is None: self.modified = datetime.utcnow() else: self.modified = modified state.db.session.add(self) if commit: state.db.session.commit() def edit(self, data, commit=True, modified=None): changes = {} if 'isbn' in data and isinstance(data['isbn'], list): isbns = [utils.to_isbn13(isbn) for isbn in data['isbn']] isbns = [isbn for isbn in isbns if isbn] if isbns: data['isbn'] = isbns[0] else: del data['isbn'] for key in data: if key == 'id': continue if data[key] != self.data.get(key): self.data[key] = data[key] changes[key] = data[key] if changes: self.data_hash = self.get_hash() self.save(commit=commit, modified=modified) return changes def delete(self): state.db.session.delete(self) state.db.session.commit() def export_list(data): with db.session(): self = List.get(data['list']) if not self: return mode = data.get('mode') prefix = data.get('path') if mode not in ('add', 'replace'): logger.debug('invalid mode %s', mode) return if not prefix or prefix == '/': logger.debug('invalid export path %s', prefix) trigger_event('activity', { 'activity': 'export', 'path': prefix, 'progress': [0, 0], 'status': {'code': 404, 'text': 'invalid export path'} }) return root = prefix while not os.path.exists(root) and root != '/': root = os.path.dirname(root) if not os.access(root, os.W_OK): logger.debug('can not write to %s', root) trigger_event('activity', { 'activity': 'export', 'path': prefix, 'progress': [0, 0], 'path': prefix, 'status': {'code': 404, 'text': 'permission denied'} }) return if os.path.exists(prefix): existing_files = set( os.path.join(root, f) for root, _, files in os.walk(prefix) for f in files ) else: existing_files = set() new_files = set() count = self.get_items().count() n = 1 for i in self.get_items(): if i.files.all(): f = i.files.all()[0] source = f.fullpath() target = os.path.join(prefix, f.path) if mode == 'add': p = 1 parts = target.rsplit('.', 1) while os.path.exists(target) and media.get_id(target) != f.sha1: target = '.'.join([parts[0], f.sha1[:p], parts[1]]) p += 1 ox.makedirs(os.path.dirname(target)) if os.path.exists(target): if mode == 'replace' and media.get_id(target) != f.sha1: os.unlink(target) shutil.copy2(source, target) else: shutil.copy2(source, target) new_files.add(target) trigger_event('activity', { 'activity': 'export', 'path': prefix, 'progress': [n, count] }) n += 1 if mode == 'replace': for f in list(existing_files - new_files): os.unlink(f) utils.remove_empty_folders(prefix) self.export_json(os.path.join(prefix, 'library.json')) trigger_event('activity', { 'activity': 'export', 'progress': [count, count], 'path': prefix, 'status': {'code': 200, 'text': ''}, }) def update_user_peering(user_id, peered, username=None): with db.session(): u = User.get(user_id, for_update=True) if u: u.update_peering(peered, username) def remove_local_info(id): if state.nodes: trigger_event('status', { 'id': id, 'online': state.nodes.is_online(id) }) def add_local_info(data): if state.nodes: state.nodes.queue('add', data['id']) def upload(data): delay = 60 with db.session(): u = User.get(data['user']) if u: if u.is_online() and state.nodes._nodes[u.id].upload(data['items']): pass else: state.main.call_later(delay, lambda: state.tasks.queue('upload', data))