openmedialibrary/oml/library.py

343 lines
13 KiB
Python

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import json
import os
import time
import ox
from sqlitedict import SqliteDict
import db
import settings
import state
import utils
from websocket import trigger_event
import logging
logger = logging.getLogger(__name__)
COMMIT_TIMEOUT = 20
def maybe_commit(t0):
if time.time() - t0 > COMMIT_TIMEOUT:
state.db.session.commit()
t0 = time.time()
return t0
class Peer(object):
def __init__(self, id):
base = os.path.join(settings.data_path, 'peers')
ox.makedirs(base)
self._dbpath = os.path.join(base, '%s.db' % id)
self._infopath = os.path.join(base, '%s.json' % id)
self.id = id
self.library = SqliteDict(self._dbpath, tablename='library', autocommit=False)
if os.path.exists(self._infopath):
with open(self._infopath) as f:
self.info = json.load(f)
else:
self.info = {}
if not 'peers' in self.info:
self.info['peers'] = {}
if not 'lists' in self.info:
self.info['lists'] = {}
def apply_changes(self, changes):
r = True
for change in changes:
if state.shutdown:
r = False
break
if not self.apply_change(change):
logger.debug('FAIL %s', change)
r = False
break
self.library.commit()
self.sync_info()
self.sync_db()
trigger_event('change', {})
return r
def apply_change(self, change):
revision, timestamp, data = change
action = data[0]
args = data[1:]
#logger.debug('apply change: %s(%s)', action, args)
if action == 'additem':
itemid, info = args
item = self.library.get(itemid, {})
item['info'] = info
item['modified'] = utils.ts2datetime(timestamp)
self.library[itemid] = item
# trigger additem
elif action == 'edititem':
itemid, meta = args
item = self.library.get(itemid, {})
if not 'meta' in item:
item['meta'] = meta
else:
item['meta'].update(meta)
item['meta_hash'] = utils.get_meta_hash(item['meta'])
item['modified'] = utils.ts2datetime(timestamp)
self.library[itemid] = item
if state.tasks:
state.tasks.queue('syncmetadata', [itemid])
state.tasks.queue('getpreview', itemid)
elif action == 'removeitem':
itemid = args[0]
if itemid in self.library:
del self.library[itemid]
elif action == 'addlist':
name = args[0]
if len(args) > 1:
query = args[1]
if not name in self.info['lists']:
self.info['lists'][name] = []
elif action == 'editlist':
name, new = args
if name in self.info['lists']:
self.info['lists'][new] = self.info['lists'].pop(name)
else:
self.info['lists'][new] = []
elif action == 'orderlists':
self.info['listorder'] = args[0]
elif action == 'removelist':
name = args[0]
if name in self.info['lists']:
del self.info['lists'][name]
elif action == 'addlistitems':
name, ids = args
if not name in self.info['lists']:
self.info['lists'][name] = []
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
elif action == 'removelistitems':
name, ids = args
if name in self.info['lists']:
self.info['lists'][name] = list(set(self.info['lists'][name]) - set(ids))
elif action == 'addpeer':
peerid, username = args
if len(peerid) == 16:
self.info['peers'][peerid] = {'username': username}
# fixme, just trigger peer update here
from user.models import User
peer = User.get_or_create(peerid)
if not 'username' in peer.info:
peer.info['username'] = username
peer.update_name()
peer.save()
elif action == 'editpeer':
peerid, data = args
if len(peerid) == 16:
if not peerid in self.info['peers']:
self.info['peers'][peerid] = {}
for key in ('username', 'contact'):
if key in data:
self.info['peers'][peerid][key] = data[key]
# fixme, just trigger peer update here
from user.models import User
peer = User.get_or_create(peerid)
update = False
if not peer.peered:
for key in ('username', 'contact'):
if key in data and peer.info.get(key) != data[key]:
peer.info[key] = data[key]
update = True
if update:
peer.save()
elif action == 'removepeer':
peerid = args[0]
if peerid in self.peers['peers']:
del self.info['peers']
# trigger peer update
elif action == 'editusername':
self.info['username'] = args[0]
elif action == 'editcontact':
self.info['contact'] = args[0]
else:
logger.debug('UNKNOWN ACTION:', action)
self.info['revision'] = revision
return True
def get_metahash(self, itemid):
item = self.library[itemid]
if 'meta_hash' not in item:
item['meta_hash'] = utils.get_meta_hash(item['meta'])
self.library[itemid] = item
return item['meta_hash']
def sync_info(self):
with open(self._infopath, 'w') as fd:
json.dump(self.info, fd, indent=4, sort_keys=True)
def join(self):
#self.library.join()
self.library.close()
self.sync_info()
def remove(self):
self.join()
os.unlink(self._dbpath)
os.unlink(self._infopath)
def sync_db(self):
import item.models
import user.models
c_user_id = item.models.user_items.columns['user_id']
c_item_id = item.models.user_items.columns['item_id']
l_list_id = user.models.list_items.columns['list_id']
l_item_id = user.models.list_items.columns['item_id']
q = item.models.user_items.select().where(c_user_id.is_(self.id))
current = set([r[1] for r in state.db.session.execute(q)])
library = set(self.library.keys())
remove = list(current - library)
logger.debug('remove %s', len(remove))
u = user.models.User.get(self.id)
listid = u.library.id
lists = [l.id for l in u.lists.all()]
getpreview = []
t0 = time.time()
if remove:
q = item.models.user_items.delete().where(c_user_id.is_(self.id)).where(c_item_id.in_(remove))
state.db.session.execute(q)
q = user.models.list_items.delete().where(l_list_id.in_(lists)).where(l_item_id.in_(remove))
state.db.session.execute(q)
add = list(library - current)
logger.debug('add %s', len(add))
listitems = {}
q = user.models.list_items.select().where(l_list_id.in_(lists))
for row in state.db.session.execute(q):
if not row['list_id'] in listitems:
listitems[row['list_id']] = set()
listitems[row['list_id']].add(row['item_id'])
t0 = maybe_commit(t0)
if add:
t0 = time.time()
q = item.models.user_items.select().where(c_user_id.is_(self.id))
useritems = {r['item_id'] for r in state.db.session.execute(q)}
for itemid in add:
i = item.models.Item.get(itemid)
if not i:
i = item.models.Item(id=itemid)
m = self.library.get(itemid, {})
i.info.update(m.get('info', {}))
i.meta = m.get('meta', {})
state.db.session.add(i)
getpreview.append(itemid)
if itemid not in useritems:
q = item.models.user_items.insert({'item_id': itemid, 'user_id': self.id})
state.db.session.execute(q)
if itemid not in listitems.get(listid, []):
q = user.models.list_items.insert({'item_id': itemid, 'list_id': listid})
state.db.session.execute(q)
if state.shutdown:
break
t0 = maybe_commit(t0)
state.db.session.commit()
if remove:
q = item.models.user_items.select()
user_items = {i['item_id'] for i in state.db.session.execute(q)}
removed_items = set(remove)-user_items
if removed_items:
item.models.Item.remove_many(removed_items)
state.db.session.commit()
if state.shutdown:
return
for itemid in getpreview:
state.tasks.queue('getpreview', itemid)
update_items = remove + add
current_lists = set(l.name for l in u.lists.all() if l.name)
add = list(set(self.info['lists']) - current_lists)
remove = list(current_lists - set(self.info['lists']))
t0 = time.time()
for l in u.lists.all():
if l.name:
logger.debug('update list %s', l.name)
if l.name in remove:
l.remove(commit=False)
else:
if l.id in listitems:
ladd = list(set(self.info['lists'][l.name]) - set(listitems[l.id]))
lremove = list(set(listitems[l.id]) - set(self.info['lists'][l.name]))
if ladd:
l.add_items(ladd, commit=False)
update_items = list(set(update_items) - set(ladd))
if lremove:
l.remove_items(lremove, commit=False)
else:
l.add_items(self.info['lists'][l.name], commit=False)
update_items = list(set(update_items) - set(self.info['lists'][l.name]))
if state.shutdown:
break
t0 = maybe_commit(t0)
state.db.session.commit()
if state.shutdown:
return
if add:
for name in add:
logger.debug('add list %s', name)
l = user.models.List.get_or_create(self.id, name)
l.add_items(self.info['lists'][name], commit=False)
trigger_event('addlist', {'id': l.public_id, 'user': self.id})
update_items = list(set(update_items) - set(self.info['lists'][name]))
if state.shutdown:
break
logger.debug('update %s', len(update_items))
t0 = time.time()
if update_items:
for i in item.models.Item.query.filter(item.models.Item.id.in_(update_items)):
i.update(commit=False)
t0 = maybe_commit(t0)
if state.shutdown:
break
state.db.session.commit()
def sync_db():
from sqlalchemy.orm import load_only
import item.models
with db.session():
sort_ids = {i.item_id for i in item.models.Sort.query.options(load_only('item_id'))}
if sort_ids:
t0 = time.time()
commit = False
for i in item.models.Item.query.filter(item.models.Item.id.notin_(sort_ids)):
i.update(commit=False)
if i.info['mediastate'] == 'unavailable' and state.tasks:
state.tasks.queue('getpreview', i.id)
commit = True
#logger.debug('sync:%s', i)
t0 = maybe_commit(t0)
if state.shutdown:
break
if commit:
state.db.session.commit()
cleanup_lists()
def cleanup_lists():
from sqlalchemy.orm import load_only
import item.models
import user.models
with db.session():
for l in user.models.List.query.all():
if not l.user.peered and not l.user.id == settings.USER_ID:
l.remove()
peers = [u.id for u in user.models.User.query.filter_by(peered=True)] + [settings.USER_ID]
q = item.models.user_items.delete().where(item.models.user_items.columns['user_id'].notin_(peers))
state.db.session.execute(q)
lists = [l.id for l in user.models.List.query.all()]
q = user.models.list_items.delete().where(user.models.list_items.columns['list_id'].notin_(lists))
state.db.session.execute(q)
state.db.session.commit()
q = item.models.user_items.select()
user_items = {i['item_id'] for i in state.db.session.execute(q)}
ids = {i.id for i in item.models.Item.query.options(load_only('id'))}
remove = ids - user_items
if remove:
item.models.Item.remove_many(remove)
state.db.session.commit()