# -*- coding: utf-8 -*- from datetime import datetime import json import os import sqlalchemy as sa from sqlalchemy.sql.expression import text from utils import datetime2ts, ts2datetime, makefolder from websocket import trigger_event import db import settings import state import logging logger = logging.getLogger(__name__) def changelog_path(): return os.path.join(settings.data_path, 'peers', '%s.log' % settings.USER_ID) def next_revision(): settings.server['revision'] = settings.server.get('revision', -1) + 1 return settings.server['revision'] def add_record(action, *args, **kwargs): if '_ts' in kwargs: timestamp = kwargs['_ts'] del kwargs['_ts'] else: timestamp = None if not timestamp: timestamp = datetime.utcnow() timestamp = datetime2ts(timestamp) revision = next_revision() data = [revision, timestamp, [action] + list(args)] data = json.dumps(data, ensure_ascii=False).encode('utf-8') path = changelog_path() if os.path.exists(path): mode = 'ab' state.changelog_size = os.path.getsize(path) else: mode = 'wb' state.changelog_size = 0 makefolder(path) with open(path, mode) as fd: fd.write(data + '\n') state.changelog_size = os.path.getsize(path) #logger.debug('record change: %s', data) def changelog_size(): if state.changelog_size is None: path = changelog_path() if not os.path.exists(path): return 0 return os.path.getsize(path) else: return state.changelog_size class Changelog(db.Model): ''' additem itemid metadata from file (info) + OLID edititem itemid name->id (i.e. olid-> OL...M) removeitem itemid addlist name editlist name {name: newname} orderlists [name, name, name] removelist name addlistitems listname [ids] removelistitems listname [ids] editusername username editcontact string addpeer peerid peername removepeer peerid peername editpeer peerid {username: string, contact: string} addannotation itemid data editannotation itemid annotationid data removeannotation itemid annotationid ''' __tablename__ = 'changelog' id = sa.Column(sa.Integer(), primary_key=True) created = sa.Column(sa.DateTime()) timestamp = sa.Column(sa.BigInteger()) user_id = sa.Column(sa.String(43)) revision = sa.Column(sa.BigInteger()) data = sa.Column(sa.Text()) sig = sa.Column(sa.String(96)) @classmethod def record(cls, user, action, *args, **kwargs): return add_record(action, *args, **kwargs) @classmethod def apply_changes(cls, user_, changes, first=False): from user.models import User user = user_ trigger = changes if trigger: trigger_event('change', {}) if first: items = set() lists = {} peers = set() for change in changes: if change[2][0] == 'additem': items.add(change[2][1]) if change[2][0] == 'addlist': lists[change[2][1]] = set() if change[2][0] == 'addlistitems': if not change[2][1] in lists: lists[change[2][1]] = set() for i in change[2][2]: lists[change[2][1]].add(i) if change[2][0] == 'addpeer': peers.add(change[2][1]) for i in user.library.items: if i.id not in items and user in i.users: i.users.remove(user) if i.users: i.update() else: i.delete() for name in lists: qs = user.lists.filter_by(name=name) if qs.count(): l = qs[0] for i in l.get_items(): if i.id not in lists[name]: if i.id in l.items: l.items.remove(i.id) for peer in User.query: if user.id in peer.info.get('users', {}) and peer.id not in peers: del peer.info['users'][user.id] peer.save() for change in changes: if state.shutdown: return False if user.id in state.removepeer: del state.removepeer[user.id] return False if not cls.apply_change(user, change, trigger=False): logger.debug('FAIL %s', change) trigger = False break return False if trigger: trigger_event('change', {}) return True @classmethod def apply_change(cls, user, change, trigger=True): revision, timestamp, data = change last = cls.query.filter_by(user_id=user.id).order_by(text('-revision')).first() next_revision = last.revision + 1 if last else 0 if revision >= next_revision: c = cls() c.created = datetime.utcnow() c.timestamp = timestamp c.user_id = user.id c.revision = revision c.data = json.dumps(data) logger.debug('apply change from %s: %s(%s)', user.name, data[0], data[1:]) if getattr(c, 'action_' + data[0])(user, timestamp, *data[1:]): logger.debug('change applied') state.db.session.add(c) state.db.session.commit() if trigger: trigger_event('change', {}) return True else: logger.debug('could not apply change') else: logger.debug('revsion does not match! got %s expecting %s', revision, next_revision) return False def __repr__(self): return self.data def json(self): timestamp = self.timestamp or datetime2ts(self.created) return [self.revision, timestamp, json.loads(self.data)] def action_additem(self, user, timestamp, itemid, info): from item.models import Item i = Item.get(itemid) if i: if user not in i.users: i.add_user(user) i.update() else: i = Item.get_or_create(itemid, info) i.modified = ts2datetime(timestamp) if user not in i.users: i.add_user(user) i.update() return True def action_edititem(self, user, timestamp, itemid, meta): from user.models import Metadata m = Metadata.get_or_create(user.id, itemid) m.edit(meta, modified=ts2datetime(timestamp)) from item.models import Item i = Item.get(itemid) if i: i.sync_metadata() if state.tasks: #state.tasks.queue('syncmetadata', [i.id]) state.tasks.queue('getpreview', i.id) return True def action_removeitem(self, user, timestamp, itemid): from item.models import Item from user.models import Metadata i = Item.get(itemid) if i: if user in i.users: i.users.remove(user) if i.users: i.update() else: i.delete() Metadata.query.filter_by(user_id=user.id, item_id=itemid).delete() return True def action_addlist(self, user, timestamp, name, query=None): from user.models import List if name == '': return True l = List.get_or_create(user.id, name) trigger_event('addlist', {'id': l.public_id, 'user': user.id}) return True def action_editlist(self, user, timestamp, name, new): from user.models import List l = List.get_or_create(user.id, name) if 'name' in new: l.name = new['name'] l.save() trigger_event('editlist', {'id': l.public_id, 'user': user.id}) return True def action_orderlists(self, user, timestamp, lists): from user.models import List idx = 0 for name in lists: l = List.get_or_create(user.id, name) l.index_ = idx l.save() idx += 1 trigger_event('orderlists', {'user': user.id}) return True def action_removelist(self, user, timestamp, name): from user.models import List l = List.get(user.id, name) if l: l.remove() trigger_event('removelist', {'id': l.public_id}) return True def action_addlistitems(self, user, timestamp, name, ids): from user.models import List l = List.get_or_create(user.id, name) l.add_items(ids) return True def action_removelistitems(self, user, timestamp, name, ids): from user.models import List l = List.get(user.id, name) if l: l.remove_items(ids) return True def action_editusername(self, user, timestamp, username): from user.models import List old = user.nickname user.info['username'] = username user.update_name() if old != user.nickname: List.rename_user(old, user.nickname) Changelog.record(state.user(), 'editpeer', self.id, self.json(['username'])) user.save() return True def action_editcontact(self, user, timestamp, contact): if user.info.get('contact') != contact: user.info['contact'] = contact user.save() Changelog.record(state.user(), 'editpeer', user.id, user.json(['contact'])) return True def action_addpeer(self, user, timestamp, peerid, username): if len(peerid) == 16: from user.models import User if not 'users' in user.info: user.info['users'] = {} user.info['users'][peerid] = username user.save() peer = User.get_or_create(peerid) if not 'username' in peer.info: peer.info['username'] = username peer.update_name() peer.save() return True def action_removepeer(self, user, timestamp, peerid): if 'users' in user.info and peerid in user.info['users']: del user.info['users'][peerid] user.save() #fixme, remove from User table if no other connection exists return True def action_editpeer(self, user, timestamp, peerid, data): if len(peerid) == 16: from user.models import User peer = User.get_or_create(peerid) update = False if not peer.peered: for key in ('username', 'contact'): if key in data and peer.info.get(key) != data[key]: peer.info[key] = data[key] update = True if update: peer.save() return True def action_addannotation(self, user, timestamp, itemid, data): from annotation.models import Annotation Annotation.create(item_id=itemid, user_id=user.id, data=data) return True def action_editannotation(self, user, timestamp, itemid, annotationid, data): from annotation.models import Annotation a = Annotation.get(user, itemid, annotationid) if a: for key in data: a.data[key] = data[key] a.save() return True def action_removeannotation(self, user, timestamp, itemid, annotationid): from annotation.models import Annotation a = Annotation.get(user, itemid, annotationid) if a: a.delete() return True @classmethod def aggregated_changes(cls, since=None, user_id=None): from item.models import Item from user.models import List from user.models import User if not user_id: user_id = settings.USER_ID qs = cls.query.filter_by(user_id=user_id) qs = Changelog.query.filter_by(user_id=user_id) if since: qs = qs.filter(Changelog.revision>=since) changes = {} orderlists = False editcontact = False editusername = False for c in qs.order_by('timestamp'): revision = c.revision timestamp = c.timestamp data = json.loads(c.data) op = data[0] if op in ('editmeta', 'resetmeta'): continue action = changes.setdefault(op, {}) if op == 'additem': item_id = data[1] info = data[2] action[item_id] = [revision, timestamp, info] if item_id in changes.get('removeitem', []): del changes['removeitem'][item_id] i = Item.get(item_id) if i: changes.setdefault('edititem', {})[item_id] = [revision+1, timestamp, i.meta] elif op == 'edititem': item_id = data[1] meta = data[2] if not item_id in action: action[item_id] = [revision, timestamp, meta] else: action[item_id][0] = revision action[item_id][1] = timestamp action[item_id][2].update(meta) elif op == 'removeitem': item_id = data[1] if item_id in changes.get('additem', []): del changes['additem'][item_id] else: action[item_id] = [revision, timestamp] if item_id in changes.get('edititem', []): del changes['edititem'][item_id] elif op == 'addlist': list_id = data[1] if list_id: ids = data[2] if len(data) > 2 else [] action[list_id] = [revision, timestamp, ids] elif op == 'editlist': old_id = data[1] new_id = data[2]['name'] r = revision if old_id not in changes.get('addlist', []): action[old_id] = [revision, timestamp, {'name': new_id}] r += 1 for a in ('addlist', 'addlistitems', 'removelistitems'): if a in changes and old_id in changes[a]: changes[a][new_id] = changes[a].pop(old_id) changes[a][new_id][0] = r elif op == 'orderlists': orderlists = True elif op == 'removelist': list_id = data[1] if list_id not in changes.get('addlist', []): action[list_id] = [revision, timestamp] for a in ('addlist', 'addlistitems', 'removelistitems'): if a in changes and list_id in changes[a]: del changes[a][list_id] elif op == 'addlistitems': list_id = data[1] if not list_id: continue listitems = data[2] if list_id not in action: action[list_id] = [revision, timestamp, []] action[list_id][0] = revision action[list_id][1] = timestamp action[list_id][2] += listitems #remove from removelistitems! if list_id in changes.get('remvelistitems', {}): changes['remvelistitems'][list_id] = [ i for i in changes['remvelistitems'][list_id] if i not in listitems ] elif op == 'removelistitems': list_id = data[1] listitems = data[2] #remove from additemlists removed = [] if list_id in changes.get('addlistitems',{}): removed = [ i for i in changes['addlistitems'][list_id] if i in listitems ] changes['addlistitems'][list_id] = [ i for i in changes['addlistitems'][list_id] if i not in listitems ] #remove remaining items listitems = [ i for i in listitems if i not in removed ] if listitems: action[list_id] = [revision, timestamp, listitems] elif op == 'editusername': editusername = True elif op == 'editcontact': editcontact = True elif op == 'addpeer': peer_id = data[1] username = data[2] if len(peer_id) == 16: peer = User.get(peer_id) if peer: username = peer.json().get('username', 'anonymous') action[peer_id] = [revision, timestamp, username] if peer_id in changes.get('removepeer', []): del changes['removepeer'][peer_id] elif op == 'removepeer': peer_id = data[1] if peer_id in changes.get('addpeer', []): del changes['addpeer'][peer_id] else: action[peer_id] = [revision, timestamp] elif op == 'editpeer': peer_id = data[1] if not peer_id in action: peer = User.get(peer_id) if peer: data = peer.json(['username', 'contact']) action[peer_id] = [revision, timestamp, data] else: logger.debug('unknonw action %s', data) _changes = [] for op in list(changes): if not changes[op]: del changes[op] else: for id in changes[op]: data = changes[op][id] rv = data[0] ts = data[1] data = [op, id] + data[2:] _changes.append([rv, ts, data]) _changes.sort(key=lambda change: (change[0], change[1])) if orderlists: ids = [l.name for l in List.query.filter_by(user_id=user_id,type='static').order_by('index_') if l.name] if len(ids) > 1: _changes.append([-1, timestamp, ['orderlists', ids]]) userinfo = state.user().json() if editusername: _changes.append([-1, timestamp, ['editusername', userinfo['username']]]) if editcontact: _changes.append([-1, timestamp, ['editcontact', userinfo['contact']]]) if _changes: r = revision for c in reversed(_changes): c[0] = r r -= 1 return _changes