457 lines
16 KiB
Python
457 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
|
# vi:si:et:sw=4:sts=4:ts=4
|
|
|
|
|
|
from datetime import datetime
|
|
import json
|
|
|
|
import sqlalchemy as sa
|
|
|
|
from utils import datetime2ts, ts2datetime
|
|
from websocket import trigger_event
|
|
import db
|
|
import settings
|
|
import state
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Changelog(db.Model):
|
|
'''
|
|
additem itemid metadata from file (info) + OLID
|
|
edititem itemid name->id (i.e. olid-> OL...M)
|
|
removeitem itemid
|
|
addlist name
|
|
editlist name {name: newname}
|
|
orderlists [name, name, name]
|
|
removelist name
|
|
addlistitems listname [ids]
|
|
removelistitems listname [ids]
|
|
editusername username
|
|
editcontact string
|
|
addpeer peerid peername
|
|
removepeer peerid peername
|
|
|
|
editmeta key, value data (i.e. 'isbn', '0000000000', {title: 'Example'})
|
|
resetmeta key, value
|
|
'''
|
|
__tablename__ = 'changelog'
|
|
id = sa.Column(sa.Integer(), primary_key=True)
|
|
|
|
created = sa.Column(sa.DateTime())
|
|
timestamp = sa.Column(sa.BigInteger())
|
|
|
|
user_id = sa.Column(sa.String(43))
|
|
revision = sa.Column(sa.BigInteger())
|
|
data = sa.Column(sa.Text())
|
|
sig = sa.Column(sa.String(96))
|
|
|
|
@classmethod
|
|
def record(cls, user, action, *args):
|
|
c = cls()
|
|
c.created = datetime.utcnow()
|
|
c.timestamp = datetime2ts(c.created)
|
|
c.user_id = user.id
|
|
c.revision = cls.query.filter_by(user_id=user.id).count()
|
|
c.data = json.dumps([action] + list(args), ensure_ascii=False)
|
|
_data = str(c.revision) + str(c.timestamp) + c.data
|
|
_data = _data.encode()
|
|
state.db.session.add(c)
|
|
state.db.session.commit()
|
|
logger.debug('record change: %s', c.json())
|
|
|
|
@classmethod
|
|
def record_ts(cls, user, ts, action, *args):
|
|
# fixme remove
|
|
c = cls()
|
|
c.created = datetime.utcnow()
|
|
if ts:
|
|
c.timestamp = datetime2ts(ts)
|
|
else:
|
|
c.timestamp = datetime2ts(c.created)
|
|
c.user_id = user.id
|
|
c.revision = cls.query.filter_by(user_id=user.id).count()
|
|
c.data = json.dumps([action] + list(args), ensure_ascii=False)
|
|
_data = str(c.revision) + str(c.timestamp) + c.data
|
|
_data = _data.encode()
|
|
state.db.session.add(c)
|
|
state.db.session.commit()
|
|
logger.debug('record change: %s', c.json())
|
|
|
|
@classmethod
|
|
def apply_changes(cls, user, changes):
|
|
trigger = changes
|
|
for change in changes:
|
|
if user.id in state.removepeer:
|
|
user.cleanup()
|
|
del state.removepeer[user.id]
|
|
return False
|
|
if not cls.apply_change(user, change, trigger=False):
|
|
logger.debug('FAIL %s', change)
|
|
trigger = False
|
|
break
|
|
return False
|
|
if trigger:
|
|
trigger_event('change', {});
|
|
return True
|
|
|
|
@classmethod
|
|
def apply_change(cls, user, change, trigger=True):
|
|
revision, timestamp, data = change
|
|
last = cls.query.filter_by(user_id=user.id).order_by('-revision').first()
|
|
next_revision = last.revision + 1 if last else 0
|
|
if revision >= next_revision:
|
|
c = cls()
|
|
c.created = datetime.utcnow()
|
|
c.timestamp = timestamp
|
|
c.user_id = user.id
|
|
c.revision = revision
|
|
c.data = data
|
|
args = json.loads(data)
|
|
logger.debug('apply change from %s: %s(%s)', user.name, args[0], args[1:])
|
|
if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]):
|
|
logger.debug('change applied')
|
|
state.db.session.add(c)
|
|
state.db.session.commit()
|
|
if trigger:
|
|
trigger_event('change', {});
|
|
return True
|
|
else:
|
|
logger.debug('could not apply change')
|
|
else:
|
|
logger.debug('revsion does not match! got %s expecting %s', revision, next_revision)
|
|
return False
|
|
|
|
def __repr__(self):
|
|
return self.data
|
|
|
|
def json(self):
|
|
timestamp = self.timestamp or datetime2ts(self.created)
|
|
return [self.revision, timestamp, self.data]
|
|
|
|
@classmethod
|
|
def restore(cls, user_id, path=None):
|
|
from user.models import User
|
|
user = User.get_or_create(user_id)
|
|
if not path:
|
|
path = '/tmp/oml_changelog_%s.json' % user_id
|
|
with open(path, 'r') as fd:
|
|
for change in fd:
|
|
change = json.loads(change)
|
|
cls.apply_change(user, change, user_id == settings.USER_ID)
|
|
|
|
@classmethod
|
|
def export(cls, user_id, path=None):
|
|
if not path:
|
|
path = '/tmp/oml_changelog_%s.json' % user_id
|
|
with open(path, 'w') as fd:
|
|
for c in cls.query.filter_by(user_id=user_id).order_by('revision'):
|
|
fd.write(json.dumps(c.json(), ensure_ascii=False) + '\n')
|
|
|
|
def action_additem(self, user, timestamp, itemid, info):
|
|
from item.models import Item
|
|
i = Item.get(itemid)
|
|
if i:
|
|
if user not in i.users:
|
|
i.add_user(user)
|
|
i.update()
|
|
else:
|
|
i = Item.get_or_create(itemid, info)
|
|
i.modified = ts2datetime(timestamp)
|
|
if user not in i.users:
|
|
i.add_user(user)
|
|
i.info['_from'] = user.id
|
|
i.update()
|
|
return True
|
|
|
|
def action_edititem(self, user, timestamp, itemid, meta):
|
|
from user.models import Metadata
|
|
m = Metadata.get_or_create(user.id, itemid)
|
|
m.edit(meta)
|
|
#FIXME: "sometimes" update item too...
|
|
from item.models import Item
|
|
i = Item.get(itemid)
|
|
if i:
|
|
update = False
|
|
if len(i.users) == 1 and user in i.users:
|
|
update = True
|
|
if i.info.get('_from') == user.id:
|
|
update = True
|
|
if update:
|
|
i.edit(meta, ts2datetime(timestamp))
|
|
return True
|
|
|
|
def action_removeitem(self, user, timestamp, itemid):
|
|
from item.models import Item
|
|
from user.models import Metadata
|
|
i = Item.get(itemid)
|
|
if i:
|
|
if user in i.users:
|
|
i.users.remove(user)
|
|
if i.users:
|
|
i.update()
|
|
else:
|
|
i.delete()
|
|
Metadata.query.filter_by(user_id=user.id, item_id=itemid).delete()
|
|
return True
|
|
|
|
def action_addlist(self, user, timestamp, name, query=None):
|
|
from user.models import List
|
|
if name == '':
|
|
return True
|
|
l = List.create(user.id, name)
|
|
trigger_event('addlist', {'id': l.public_id, 'user': user.id})
|
|
return True
|
|
|
|
def action_editlist(self, user, timestamp, name, new):
|
|
from user.models import List
|
|
l = List.get_or_create(user.id, name)
|
|
if 'name' in new:
|
|
l.name = new['name']
|
|
l.save()
|
|
trigger_event('editlist', {'id': l.public_id, 'user': user.id})
|
|
return True
|
|
|
|
def action_orderlists(self, user, timestamp, lists):
|
|
from user.models import List
|
|
idx = 0
|
|
for name in lists:
|
|
l = List.get_or_create(user.id, name)
|
|
l.index_ = idx
|
|
l.save()
|
|
idx += 1
|
|
trigger_event('orderlists', {'user': user.id})
|
|
return True
|
|
|
|
def action_removelist(self, user, timestamp, name):
|
|
from user.models import List
|
|
l = List.get(user.id, name)
|
|
if l:
|
|
l.remove()
|
|
trigger_event('removelist', {'id': l.public_id})
|
|
return True
|
|
|
|
def action_addlistitems(self, user, timestamp, name, ids):
|
|
from user.models import List
|
|
l = List.get_or_create(user.id, name)
|
|
l.add_items(ids)
|
|
return True
|
|
|
|
def action_removelistitems(self, user, timestamp, name, ids):
|
|
from user.models import List
|
|
l = List.get(user.id, name)
|
|
if l:
|
|
l.remove_items(ids)
|
|
return True
|
|
|
|
def action_editusername(self, user, timestamp, username):
|
|
from user.models import List
|
|
old = user.nickname
|
|
user.info['username'] = username
|
|
user.update_name()
|
|
if old != user.nickname:
|
|
List.rename_user(old, user.nickname)
|
|
user.save()
|
|
return True
|
|
|
|
def action_editcontact(self, user, timestamp, contact):
|
|
user.info['contact'] = contact
|
|
user.save()
|
|
return True
|
|
|
|
def action_addpeer(self, user, timestamp, peerid, username):
|
|
if len(peerid) == 16:
|
|
from user.models import User
|
|
if not 'users' in user.info:
|
|
user.info['users'] = {}
|
|
user.info['users'][peerid] = username
|
|
user.save()
|
|
peer = User.get_or_create(peerid)
|
|
if not 'username' in peer.info:
|
|
peer.info['username'] = username
|
|
peer.update_name()
|
|
peer.save()
|
|
return True
|
|
|
|
def action_removepeer(self, user, timestamp, peerid):
|
|
if 'users' in user.info and peerid in user.info['users']:
|
|
del user.info['users'][peerid]
|
|
user.save()
|
|
#fixme, remove from User table if no other connection exists
|
|
return True
|
|
|
|
def action_editmeta(self, user, timestamp, key, value, data):
|
|
return True
|
|
'''>> Metadata no longer tracked per isbn'''
|
|
from item.models import Metadata
|
|
m = Metadata.get(key, value)
|
|
if not m or m.timestamp < timestamp:
|
|
if not m:
|
|
m = Metadata.get_or_create(key, value)
|
|
if m.edit(data, False):
|
|
m.update_items()
|
|
return True
|
|
|
|
def action_resetmeta(self, user, timestamp, key, value):
|
|
return True
|
|
from item.models import Metadata
|
|
m = Metadata.get(key, value)
|
|
if m and m.timestamp < timestamp:
|
|
m.reset()
|
|
return True
|
|
|
|
@classmethod
|
|
def aggregated_changes(cls, since=None, user_id=None):
|
|
from item.models import Item
|
|
from user.models import List
|
|
if not user_id:
|
|
user_id = settings.USER_ID
|
|
qs = cls.query.filter_by(user_id=user_id)
|
|
qs = Changelog.query.filter_by(user_id=user_id)
|
|
if since:
|
|
qs = qs.filter(Changelog.revision>=since)
|
|
changes = {}
|
|
orderlists = False
|
|
for c in qs.order_by('timestamp'):
|
|
revision = c.revision
|
|
timestamp = c.timestamp
|
|
data = json.loads(c.data)
|
|
op = data[0]
|
|
if op in ('editmeta', 'resetmeta'):
|
|
continue
|
|
action = changes.setdefault(op, {})
|
|
if op == 'additem':
|
|
item_id = data[1]
|
|
info = data[2]
|
|
action[item_id] = [revision, timestamp, info]
|
|
if item_id in changes.get('removeitem', []):
|
|
del changes['removeitem'][item_id]
|
|
i = Item.get(item_id)
|
|
if i:
|
|
changes.setdefault('edititem', {})[item_id] = [revision+1, timestamp, i.meta]
|
|
elif op == 'edititem':
|
|
item_id = data[1]
|
|
meta = data[2]
|
|
if not item_id in action:
|
|
action[item_id] = [revision, timestamp, meta]
|
|
else:
|
|
action[item_id][0] = revision
|
|
action[item_id][1] = timestamp
|
|
action[item_id][2].update(meta)
|
|
elif op == 'removeitem':
|
|
item_id = data[1]
|
|
if item_id in changes.get('additem', []):
|
|
del changes['additem'][item_id]
|
|
else:
|
|
action[item_id] = [revision, timestamp]
|
|
if item_id in changes.get('edititem', []):
|
|
del changes['edititem'][item_id]
|
|
elif op == 'addlist':
|
|
list_id = data[1]
|
|
if list_id:
|
|
ids = data[2] if len(data) > 2 else []
|
|
action[list_id] = [revision, timestamp, ids]
|
|
elif op == 'editlist':
|
|
old_id = data[1]
|
|
new_id = data[2]['name']
|
|
r = revision
|
|
if old_id not in changes.get('addlist', []):
|
|
action[old_id] = [revision, timestamp, {'name': new_id}]
|
|
r += 1
|
|
for a in ('addlist', 'addlistitems', 'removelistitems'):
|
|
if a in changes and old_id in changes[a]:
|
|
changes[a][new_id] = changes[a].pop(old_id)
|
|
changes[a][new_id][0] = r
|
|
elif op == 'orderlists':
|
|
orderlists = True
|
|
elif op == 'removelist':
|
|
list_id = data[1]
|
|
if list_id not in changes.get('addlist', []):
|
|
action[list_id] = [revision, timestamp]
|
|
for a in ('addlist', 'addlistitems', 'removelistitems'):
|
|
if a in changes and list_id in changes[a]:
|
|
del changes[a][list_id]
|
|
elif op == 'addlistitems':
|
|
list_id = data[1]
|
|
if not list_id:
|
|
continue
|
|
listitems = data[2]
|
|
if list_id not in action:
|
|
action[list_id] = [revision, timestamp, []]
|
|
action[list_id][0] = revision
|
|
action[list_id][1] = timestamp
|
|
action[list_id][2] += listitems
|
|
#remove from removelistitems!
|
|
if list_id in changes.get('remvelistitems', {}):
|
|
changes['remvelistitems'][list_id] = [
|
|
i for i in changes['remvelistitems'][list_id] if i not in listitems
|
|
]
|
|
elif op == 'removelistitems':
|
|
list_id = data[1]
|
|
listitems = data[2]
|
|
#remove from additemlists
|
|
removed = []
|
|
if list_id in changes.get('addlistitems',{}):
|
|
removed = [
|
|
i for i in changes['addlistitems'][list_id] if i in listitems
|
|
]
|
|
changes['addlistitems'][list_id] = [
|
|
i for i in changes['addlistitems'][list_id] if i not in listitems
|
|
]
|
|
#remove remaining items
|
|
listitems = [
|
|
i for i in listitems if i not in removed
|
|
]
|
|
if listitems:
|
|
action[list_id] = [revision, timestamp, listitems]
|
|
elif op == 'editusername':
|
|
old_name = data[1]
|
|
new_name = data[2]
|
|
#fixme merge multiple edits
|
|
action[old_name] = [revision, timestamp, new_name]
|
|
elif op == 'editcontact':
|
|
old_contact = data[1]
|
|
new_contact = data[2]
|
|
#fixme merge multiple edits
|
|
action[old_contact] = [revision, timestamp, new_contact]
|
|
pass
|
|
elif op == 'addpeer':
|
|
peer = data[1]
|
|
username = data[2]
|
|
action[peer] = [revision, timestamp, username]
|
|
if peer in changes.get('removepeer', []):
|
|
del changes['removepeer'][peer]
|
|
elif op == 'removepeer':
|
|
peer = data[1]
|
|
if peer in changes.get('addpeer', []):
|
|
del changes['addpeer'][peer]
|
|
else:
|
|
action[peer] = [revision, timestamp]
|
|
elif op == 'editmeta':
|
|
pass
|
|
elif op == 'resetmeta':
|
|
pass
|
|
else:
|
|
print('unknonw action', data)
|
|
_changes = []
|
|
for op in list(changes):
|
|
if not changes[op]:
|
|
del changes[op]
|
|
else:
|
|
for id in changes[op]:
|
|
data = changes[op][id]
|
|
rv = data[0]
|
|
ts = data[1]
|
|
data = [op, id] + data[2:]
|
|
_changes.append([rv, ts, json.dumps(data)])
|
|
_changes.sort(key=lambda change: (change[0], change[1]))
|
|
if orderlists:
|
|
ids = [l.name for l in List.query.filter_by(user_id=user_id,type='static').order_by('index_') if l.name]
|
|
if len(ids) > 1:
|
|
_changes.append([-1, timestamp, json.dumps(['orderlists', ids])])
|
|
if _changes:
|
|
r = revision
|
|
for c in reversed(_changes):
|
|
c[0] = r
|
|
r -= 1
|
|
return _changes
|