user per peer library (sqlitedict)
This commit is contained in:
parent
cb382287f5
commit
0ca89db3cd
11 changed files with 472 additions and 60 deletions
|
@ -3,7 +3,6 @@
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import base64
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
|
@ -98,6 +97,13 @@ class Item(db.Model):
|
||||||
def find(cls, data):
|
def find(cls, data):
|
||||||
return Parser(cls, user_items, Find, Sort).find(data)
|
return Parser(cls, user_items, Find, Sort).find(data)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def remove_many(cls, ids):
|
||||||
|
Find.query.filter(Find.item_id.in_(ids)).delete(synchronize_session=False)
|
||||||
|
Sort.query.filter(Sort.item_id.in_(ids)).delete(synchronize_session=False)
|
||||||
|
cls.query.filter(cls.id.in_(ids)).delete(synchronize_session=False)
|
||||||
|
state.db.session.expire_all()
|
||||||
|
|
||||||
def add_user(self, user):
|
def add_user(self, user):
|
||||||
if not user in self.users:
|
if not user in self.users:
|
||||||
self.users.append(user)
|
self.users.append(user)
|
||||||
|
@ -336,7 +342,7 @@ class Item(db.Model):
|
||||||
f.move()
|
f.move()
|
||||||
|
|
||||||
def get_hash(self):
|
def get_hash(self):
|
||||||
return utils.get_meta_hash(self.meta.copy())
|
return utils.get_meta_hash(self.meta)
|
||||||
|
|
||||||
def get_sorttitle(self):
|
def get_sorttitle(self):
|
||||||
title = self.meta.get('sorttitle')
|
title = self.meta.get('sorttitle')
|
||||||
|
@ -346,7 +352,6 @@ class Item(db.Model):
|
||||||
return title
|
return title
|
||||||
|
|
||||||
def sync_metadata(self):
|
def sync_metadata(self):
|
||||||
from user.models import Metadata
|
|
||||||
if self.meta.get('sharemetadata'):
|
if self.meta.get('sharemetadata'):
|
||||||
return
|
return
|
||||||
peers = [u for u in self.users if u.id != settings.USER_ID]
|
peers = [u for u in self.users if u.id != settings.USER_ID]
|
||||||
|
@ -355,34 +360,44 @@ class Item(db.Model):
|
||||||
first_peer = None
|
first_peer = None
|
||||||
# get first peer with sharemetadata set
|
# get first peer with sharemetadata set
|
||||||
for u in peers:
|
for u in peers:
|
||||||
m = Metadata.get(u.id, self.id)
|
peer = utils.get_peer(u.id)
|
||||||
|
if self.id in peer.library:
|
||||||
|
m = peer.library[self.id].get('meta')
|
||||||
|
else:
|
||||||
|
m = None
|
||||||
if m:
|
if m:
|
||||||
if m.data.get('sharemetadata'):
|
if m.get('sharemetadata'):
|
||||||
sync_from = m
|
sync_from = u.id
|
||||||
break
|
break
|
||||||
if not first_peer:
|
if not first_peer:
|
||||||
first_peer = m
|
first_peer = u.id
|
||||||
# of fall back to first peer that has this item
|
# of fall back to first peer that has this item
|
||||||
# in case its not available locally
|
# in case its not available locally
|
||||||
if not sync_from and self.info.get('mediastate') != 'available' and first_peer:
|
if not sync_from and self.info.get('mediastate') != 'available' and first_peer:
|
||||||
#logger.debug('syncing from first peer that has item %s', first_peer)
|
#logger.debug('syncing from first peer that has item %s', first_peer)
|
||||||
sync_from = first_peer
|
sync_from = first_peer
|
||||||
if sync_from:
|
if sync_from:
|
||||||
if self.get_hash() != sync_from.data_hash:
|
peer = utils.get_peer(sync_from)
|
||||||
logger.debug('update %s with metadata from %s', self, sync_from.user_id)
|
data_hash = peer.get_metahash(self.id)
|
||||||
|
item = peer.library[self.id]
|
||||||
|
sync_meta = item['meta']
|
||||||
|
sync_modified = item.get('modified')
|
||||||
|
if self.get_hash() != data_hash:
|
||||||
|
logger.debug('update %s with metadata from %s', self, sync_from)
|
||||||
record = {}
|
record = {}
|
||||||
for key in sync_from.data:
|
for key in sync_meta:
|
||||||
if key != 'sharemetadata' and self.meta.get(key) != sync_from.data[key]:
|
if key != 'sharemetadata' and self.meta.get(key) != sync_meta[key]:
|
||||||
record[key] = self.meta[key] = sync_from.data[key]
|
record[key] = self.meta[key] = sync_meta[key]
|
||||||
for key in set(self.meta)-set(sync_from.data):
|
for key in set(self.meta)-set(sync_meta):
|
||||||
record[key] = self.meta[key] = [] if key in self.array_keys else ''
|
record[key] = self.meta[key] = [] if key in self.array_keys else ''
|
||||||
self.update(sync_from.modified)
|
self.update(sync_modified)
|
||||||
self.save()
|
self.save()
|
||||||
user = state.user()
|
user = state.user()
|
||||||
if record and user in self.users:
|
if record and user in self.users:
|
||||||
Changelog.record(user, 'edititem', self.id, record, _ts=self.modified)
|
Changelog.record(user, 'edititem', self.id, record, _ts=self.modified)
|
||||||
if 'cover' in record:
|
if 'cover' in record:
|
||||||
self.update_cover()
|
if state.tasks:
|
||||||
|
state.tasks.queue('getcover', self.id)
|
||||||
|
|
||||||
def extract_preview(self):
|
def extract_preview(self):
|
||||||
path = self.get_path()
|
path = self.get_path()
|
||||||
|
@ -450,7 +465,7 @@ class Item(db.Model):
|
||||||
def update_icons(self):
|
def update_icons(self):
|
||||||
if state.online:
|
if state.online:
|
||||||
self.update_cover()
|
self.update_cover()
|
||||||
else:
|
elif state.tasks:
|
||||||
state.tasks.queue('getcover', self.id)
|
state.tasks.queue('getcover', self.id)
|
||||||
self.update_preview()
|
self.update_preview()
|
||||||
|
|
||||||
|
|
343
oml/library.py
Normal file
343
oml/library.py
Normal file
|
@ -0,0 +1,343 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# vi:si:et:sw=4:sts=4:ts=4
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import ox
|
||||||
|
from sqlitedict import SqliteDict
|
||||||
|
|
||||||
|
import db
|
||||||
|
import settings
|
||||||
|
import state
|
||||||
|
import utils
|
||||||
|
from websocket import trigger_event
|
||||||
|
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
COMMIT_TIMEOUT = 20
|
||||||
|
|
||||||
|
def maybe_commit(t0):
|
||||||
|
if time.time() - t0 > COMMIT_TIMEOUT:
|
||||||
|
state.db.session.commit()
|
||||||
|
t0 = time.time()
|
||||||
|
return t0
|
||||||
|
|
||||||
|
class Peer(object):
|
||||||
|
def __init__(self, id):
|
||||||
|
base = os.path.join(settings.data_path, 'peers')
|
||||||
|
ox.makedirs(base)
|
||||||
|
self._dbpath = os.path.join(base, '%s.db' % id)
|
||||||
|
self._infopath = os.path.join(base, '%s.json' % id)
|
||||||
|
|
||||||
|
self.id = id
|
||||||
|
self.library = SqliteDict(self._dbpath, tablename='library', autocommit=False)
|
||||||
|
|
||||||
|
if os.path.exists(self._infopath):
|
||||||
|
with open(self._infopath) as f:
|
||||||
|
self.info = json.load(f)
|
||||||
|
else:
|
||||||
|
self.info = {}
|
||||||
|
if not 'peers' in self.info:
|
||||||
|
self.info['peers'] = {}
|
||||||
|
if not 'lists' in self.info:
|
||||||
|
self.info['lists'] = {}
|
||||||
|
|
||||||
|
def apply_changes(self, changes):
|
||||||
|
r = True
|
||||||
|
for change in changes:
|
||||||
|
if state.shutdown:
|
||||||
|
r = False
|
||||||
|
break
|
||||||
|
if not self.apply_change(change):
|
||||||
|
logger.debug('FAIL %s', change)
|
||||||
|
r = False
|
||||||
|
break
|
||||||
|
self.library.commit()
|
||||||
|
self.sync_info()
|
||||||
|
self.sync_db()
|
||||||
|
trigger_event('change', {})
|
||||||
|
return r
|
||||||
|
|
||||||
|
def apply_change(self, change):
|
||||||
|
revision, timestamp, data = change
|
||||||
|
action = data[0]
|
||||||
|
args = data[1:]
|
||||||
|
#logger.debug('apply change: %s(%s)', action, args)
|
||||||
|
if action == 'additem':
|
||||||
|
itemid, info = args
|
||||||
|
item = self.library.get(itemid, {})
|
||||||
|
item['info'] = info
|
||||||
|
item['modified'] = utils.ts2datetime(timestamp)
|
||||||
|
self.library[itemid] = item
|
||||||
|
# trigger additem
|
||||||
|
elif action == 'edititem':
|
||||||
|
itemid, meta = args
|
||||||
|
item = self.library.get(itemid, {})
|
||||||
|
if not 'meta' in item:
|
||||||
|
item['meta'] = meta
|
||||||
|
else:
|
||||||
|
item['meta'].update(meta)
|
||||||
|
item['meta_hash'] = utils.get_meta_hash(item['meta'])
|
||||||
|
item['modified'] = utils.ts2datetime(timestamp)
|
||||||
|
self.library[itemid] = item
|
||||||
|
if state.tasks:
|
||||||
|
state.tasks.queue('syncmetadata', [itemid])
|
||||||
|
state.tasks.queue('getpreview', itemid)
|
||||||
|
elif action == 'removeitem':
|
||||||
|
itemid = args[0]
|
||||||
|
if itemid in self.library:
|
||||||
|
del self.library[itemid]
|
||||||
|
elif action == 'addlist':
|
||||||
|
name = args[0]
|
||||||
|
if len(args) > 1:
|
||||||
|
query = args[1]
|
||||||
|
if not name in self.info['lists']:
|
||||||
|
self.info['lists'][name] = []
|
||||||
|
elif action == 'editlist':
|
||||||
|
name, new = args
|
||||||
|
if name in self.info['lists']:
|
||||||
|
self.info['lists'][new] = self.info['lists'].pop(name)
|
||||||
|
else:
|
||||||
|
self.info['lists'][new] = []
|
||||||
|
elif action == 'orderlists':
|
||||||
|
self.info['listorder'] = args[0]
|
||||||
|
elif action == 'removelist':
|
||||||
|
name = args[0]
|
||||||
|
if name in self.info['lists']:
|
||||||
|
del self.info['lists'][name]
|
||||||
|
elif action == 'addlistitems':
|
||||||
|
name, ids = args
|
||||||
|
if not name in self.info['lists']:
|
||||||
|
self.info['lists'][name] = []
|
||||||
|
self.info['lists'][name] = list(set(self.info['lists'][name]) | set(ids))
|
||||||
|
elif action == 'removelistitems':
|
||||||
|
name, ids = args
|
||||||
|
if name in self.info['lists']:
|
||||||
|
self.info['lists'][name] = list(set(self.info['lists'][name]) - set(ids))
|
||||||
|
elif action == 'addpeer':
|
||||||
|
peerid, username = args
|
||||||
|
if len(peerid) == 16:
|
||||||
|
self.info['peers'][peerid] = {'username': username}
|
||||||
|
# fixme, just trigger peer update here
|
||||||
|
from user.models import User
|
||||||
|
peer = User.get_or_create(peerid)
|
||||||
|
if not 'username' in peer.info:
|
||||||
|
peer.info['username'] = username
|
||||||
|
peer.update_name()
|
||||||
|
peer.save()
|
||||||
|
elif action == 'editpeer':
|
||||||
|
peerid, data = args
|
||||||
|
if len(peerid) == 16:
|
||||||
|
if not peerid in self.info['peers']:
|
||||||
|
self.info['peers'][peerid] = {}
|
||||||
|
for key in ('username', 'contact'):
|
||||||
|
if key in data:
|
||||||
|
self.info['peers'][peerid][key] = data[key]
|
||||||
|
# fixme, just trigger peer update here
|
||||||
|
from user.models import User
|
||||||
|
peer = User.get_or_create(peerid)
|
||||||
|
update = False
|
||||||
|
if not peer.peered:
|
||||||
|
for key in ('username', 'contact'):
|
||||||
|
if key in data and peer.info.get(key) != data[key]:
|
||||||
|
peer.info[key] = data[key]
|
||||||
|
update = True
|
||||||
|
if update:
|
||||||
|
peer.save()
|
||||||
|
elif action == 'removepeer':
|
||||||
|
peerid = args[0]
|
||||||
|
if peerid in self.peers['peers']:
|
||||||
|
del self.info['peers']
|
||||||
|
# trigger peer update
|
||||||
|
elif action == 'editusername':
|
||||||
|
self.info['username'] = args[0]
|
||||||
|
elif action == 'editcontact':
|
||||||
|
self.info['contact'] = args[0]
|
||||||
|
else:
|
||||||
|
logger.debug('UNKNOWN ACTION:', action)
|
||||||
|
self.info['revision'] = revision
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_metahash(self, itemid):
|
||||||
|
item = self.library[itemid]
|
||||||
|
if 'meta_hash' not in item:
|
||||||
|
item['meta_hash'] = utils.get_meta_hash(item['meta'])
|
||||||
|
self.library[itemid] = item
|
||||||
|
return item['meta_hash']
|
||||||
|
|
||||||
|
def sync_info(self):
|
||||||
|
with open(self._infopath, 'w') as fd:
|
||||||
|
json.dump(self.info, fd, indent=4, sort_keys=True)
|
||||||
|
|
||||||
|
def join(self):
|
||||||
|
#self.library.join()
|
||||||
|
self.library.close()
|
||||||
|
self.sync_info()
|
||||||
|
|
||||||
|
def remove(self):
|
||||||
|
self.join()
|
||||||
|
os.unlink(self._dbpath)
|
||||||
|
os.unlink(self._infopath)
|
||||||
|
|
||||||
|
def sync_db(self):
|
||||||
|
import item.models
|
||||||
|
import user.models
|
||||||
|
c_user_id = item.models.user_items.columns['user_id']
|
||||||
|
c_item_id = item.models.user_items.columns['item_id']
|
||||||
|
l_list_id = user.models.list_items.columns['list_id']
|
||||||
|
l_item_id = user.models.list_items.columns['item_id']
|
||||||
|
q = item.models.user_items.select().where(c_user_id.is_(self.id))
|
||||||
|
current = set([r[1] for r in state.db.session.execute(q)])
|
||||||
|
library = set(self.library.keys())
|
||||||
|
remove = list(current - library)
|
||||||
|
logger.debug('remove %s', len(remove))
|
||||||
|
u = user.models.User.get(self.id)
|
||||||
|
listid = u.library.id
|
||||||
|
lists = [l.id for l in u.lists.all()]
|
||||||
|
getpreview = []
|
||||||
|
t0 = time.time()
|
||||||
|
if remove:
|
||||||
|
q = item.models.user_items.delete().where(c_user_id.is_(self.id)).where(c_item_id.in_(remove))
|
||||||
|
state.db.session.execute(q)
|
||||||
|
q = user.models.list_items.delete().where(l_list_id.in_(lists)).where(l_item_id.in_(remove))
|
||||||
|
state.db.session.execute(q)
|
||||||
|
add = list(library - current)
|
||||||
|
logger.debug('add %s', len(add))
|
||||||
|
listitems = {}
|
||||||
|
q = user.models.list_items.select().where(l_list_id.in_(lists))
|
||||||
|
for row in state.db.session.execute(q):
|
||||||
|
if not row['list_id'] in listitems:
|
||||||
|
listitems[row['list_id']] = set()
|
||||||
|
listitems[row['list_id']].add(row['item_id'])
|
||||||
|
t0 = maybe_commit(t0)
|
||||||
|
if add:
|
||||||
|
t0 = time.time()
|
||||||
|
q = item.models.user_items.select().where(c_user_id.is_(self.id))
|
||||||
|
useritems = {r['item_id'] for r in state.db.session.execute(q)}
|
||||||
|
for itemid in add:
|
||||||
|
i = item.models.Item.get(itemid)
|
||||||
|
if not i:
|
||||||
|
i = item.models.Item(id=itemid)
|
||||||
|
m = self.library.get(itemid, {})
|
||||||
|
i.info.update(m.get('info', {}))
|
||||||
|
i.meta = m.get('meta', {})
|
||||||
|
state.db.session.add(i)
|
||||||
|
getpreview.append(itemid)
|
||||||
|
if itemid not in useritems:
|
||||||
|
q = item.models.user_items.insert({'item_id': itemid, 'user_id': self.id})
|
||||||
|
state.db.session.execute(q)
|
||||||
|
if itemid not in listitems.get(listid, []):
|
||||||
|
q = user.models.list_items.insert({'item_id': itemid, 'list_id': listid})
|
||||||
|
state.db.session.execute(q)
|
||||||
|
if state.shutdown:
|
||||||
|
break
|
||||||
|
t0 = maybe_commit(t0)
|
||||||
|
state.db.session.commit()
|
||||||
|
if remove:
|
||||||
|
q = item.models.user_items.select()
|
||||||
|
user_items = {i['item_id'] for i in state.db.session.execute(q)}
|
||||||
|
removed_items = set(remove)-user_items
|
||||||
|
if removed_items:
|
||||||
|
item.models.Item.remove_many(removed_items)
|
||||||
|
state.db.session.commit()
|
||||||
|
if state.shutdown:
|
||||||
|
return
|
||||||
|
for itemid in getpreview:
|
||||||
|
state.tasks.queue('getpreview', itemid)
|
||||||
|
update_items = remove + add
|
||||||
|
|
||||||
|
current_lists = set(l.name for l in u.lists.all() if l.name)
|
||||||
|
add = list(set(self.info['lists']) - current_lists)
|
||||||
|
remove = list(current_lists - set(self.info['lists']))
|
||||||
|
t0 = time.time()
|
||||||
|
for l in u.lists.all():
|
||||||
|
if l.name:
|
||||||
|
logger.debug('update list %s', l.name)
|
||||||
|
if l.name in remove:
|
||||||
|
l.remove(commit=False)
|
||||||
|
else:
|
||||||
|
if l.id in listitems:
|
||||||
|
ladd = list(set(self.info['lists'][l.name]) - set(listitems[l.id]))
|
||||||
|
lremove = list(set(listitems[l.id]) - set(self.info['lists'][l.name]))
|
||||||
|
if ladd:
|
||||||
|
l.add_items(ladd, commit=False)
|
||||||
|
update_items = list(set(update_items) - set(ladd))
|
||||||
|
if lremove:
|
||||||
|
l.remove_items(lremove, commit=False)
|
||||||
|
else:
|
||||||
|
l.add_items(self.info['lists'][l.name], commit=False)
|
||||||
|
update_items = list(set(update_items) - set(self.info['lists'][l.name]))
|
||||||
|
if state.shutdown:
|
||||||
|
break
|
||||||
|
t0 = maybe_commit(t0)
|
||||||
|
state.db.session.commit()
|
||||||
|
if state.shutdown:
|
||||||
|
return
|
||||||
|
if add:
|
||||||
|
for name in add:
|
||||||
|
logger.debug('add list %s', name)
|
||||||
|
l = user.models.List.get_or_create(self.id, name)
|
||||||
|
l.add_items(self.info['lists'][name], commit=False)
|
||||||
|
trigger_event('addlist', {'id': l.public_id, 'user': self.id})
|
||||||
|
update_items = list(set(update_items) - set(self.info['lists'][name]))
|
||||||
|
if state.shutdown:
|
||||||
|
break
|
||||||
|
logger.debug('update %s', len(update_items))
|
||||||
|
t0 = time.time()
|
||||||
|
if update_items:
|
||||||
|
for i in item.models.Item.query.filter(item.models.Item.id.in_(update_items)):
|
||||||
|
i.update(commit=False)
|
||||||
|
t0 = maybe_commit(t0)
|
||||||
|
if state.shutdown:
|
||||||
|
break
|
||||||
|
state.db.session.commit()
|
||||||
|
|
||||||
|
def sync_db():
|
||||||
|
from sqlalchemy.orm import load_only
|
||||||
|
import item.models
|
||||||
|
with db.session():
|
||||||
|
sort_ids = {i.item_id for i in item.models.Sort.query.options(load_only('item_id'))}
|
||||||
|
if sort_ids:
|
||||||
|
t0 = time.time()
|
||||||
|
commit = False
|
||||||
|
for i in item.models.Item.query.filter(item.models.Item.id.notin_(sort_ids)):
|
||||||
|
i.update(commit=False)
|
||||||
|
if i.info['mediastate'] == 'unavailable' and state.tasks:
|
||||||
|
state.tasks.queue('getpreview', i.id)
|
||||||
|
commit = True
|
||||||
|
#logger.debug('sync:%s', i)
|
||||||
|
t0 = maybe_commit(t0)
|
||||||
|
if state.shutdown:
|
||||||
|
break
|
||||||
|
if commit:
|
||||||
|
state.db.session.commit()
|
||||||
|
cleanup_lists()
|
||||||
|
|
||||||
|
def cleanup_lists():
|
||||||
|
from sqlalchemy.orm import load_only
|
||||||
|
import item.models
|
||||||
|
import user.models
|
||||||
|
with db.session():
|
||||||
|
for l in user.models.List.query.all():
|
||||||
|
if not l.user.peered and not l.user.id == settings.USER_ID:
|
||||||
|
l.remove()
|
||||||
|
|
||||||
|
peers = [u.id for u in user.models.User.query.filter_by(peered=True)] + [settings.USER_ID]
|
||||||
|
q = item.models.user_items.delete().where(item.models.user_items.columns['user_id'].notin_(peers))
|
||||||
|
state.db.session.execute(q)
|
||||||
|
|
||||||
|
lists = [l.id for l in user.models.List.query.all()]
|
||||||
|
q = user.models.list_items.delete().where(user.models.list_items.columns['list_id'].notin_(lists))
|
||||||
|
state.db.session.execute(q)
|
||||||
|
|
||||||
|
state.db.session.commit()
|
||||||
|
|
||||||
|
q = item.models.user_items.select()
|
||||||
|
user_items = {i['item_id'] for i in state.db.session.execute(q)}
|
||||||
|
ids = {i.id for i in item.models.Item.query.options(load_only('id'))}
|
||||||
|
remove = ids - user_items
|
||||||
|
if remove:
|
||||||
|
item.models.Item.remove_many(remove)
|
||||||
|
state.db.session.commit()
|
|
@ -49,17 +49,18 @@ def api_requestPeering(user_id, username, message):
|
||||||
|
|
||||||
def api_acceptPeering(user_id, username, message):
|
def api_acceptPeering(user_id, username, message):
|
||||||
user = User.get(user_id)
|
user = User.get(user_id)
|
||||||
logger.debug('incoming acceptPeering event: pending: %s', user.pending)
|
if user:
|
||||||
if user and user.pending == 'sent':
|
logger.debug('incoming acceptPeering event: pending: %s', user.pending)
|
||||||
user.info['username'] = username
|
if user.pending == 'sent':
|
||||||
user.info['message'] = message
|
user.info['username'] = username
|
||||||
user.update_name()
|
user.info['message'] = message
|
||||||
user.update_peering(True, username)
|
user.update_name()
|
||||||
state.nodes.queue('add', user.id, True)
|
user.update_peering(True, username)
|
||||||
trigger_event('peering.accept', user.json())
|
state.nodes.queue('add', user.id, True)
|
||||||
return True
|
trigger_event('peering.accept', user.json())
|
||||||
elif user and user.peered:
|
return True
|
||||||
return True
|
elif user.peered:
|
||||||
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def api_rejectPeering(user_id, message):
|
def api_rejectPeering(user_id, message):
|
||||||
|
|
40
oml/nodes.py
40
oml/nodes.py
|
@ -22,9 +22,10 @@ from changelog import Changelog
|
||||||
from websocket import trigger_event
|
from websocket import trigger_event
|
||||||
from localnodes import LocalNodes
|
from localnodes import LocalNodes
|
||||||
from tor_request import get_opener
|
from tor_request import get_opener
|
||||||
from utils import user_sort_key
|
from utils import user_sort_key, get_peer
|
||||||
import state
|
import state
|
||||||
import db
|
import db
|
||||||
|
import library
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -247,24 +248,21 @@ class Node(Thread):
|
||||||
def pullChanges(self):
|
def pullChanges(self):
|
||||||
with db.session():
|
with db.session():
|
||||||
u = user.models.User.get_or_create(self.user_id)
|
u = user.models.User.get_or_create(self.user_id)
|
||||||
if not self.online or not u.peered:
|
if not u or not self.online or not u.peered:
|
||||||
return True
|
return True
|
||||||
last = Changelog.query.filter_by(user_id=self.user_id).order_by('-revision').first()
|
peer = get_peer(self.user_id)
|
||||||
from_revision = last.revision + 1 if last else 0
|
from_revision = peer.info.get('revision', -1) + 1
|
||||||
try:
|
try:
|
||||||
changes = self.request('pullChanges', from_revision)
|
changes = self.request('pullChanges', from_revision)
|
||||||
except:
|
except:
|
||||||
self.online = False
|
self.online = False
|
||||||
logger.debug('%s went offline', u.name)
|
logger.debug('%s went offline', u.name, ext_info=True)
|
||||||
return False
|
return False
|
||||||
if not changes:
|
if not changes:
|
||||||
return False
|
return False
|
||||||
try:
|
#with open('/tmp/changelog_%s_%s.json' % (self.user_id, from_revision), 'w') as f:
|
||||||
r = Changelog.apply_changes(u, changes, first=from_revision == 0)
|
# json.dump(changes, f, ensure_ascii=False, indent=4)
|
||||||
except:
|
return peer.apply_changes(changes)
|
||||||
logger.debug('apply_changes failed', exc_info=True)
|
|
||||||
r = False
|
|
||||||
return r
|
|
||||||
|
|
||||||
def peering(self, action):
|
def peering(self, action):
|
||||||
with db.session():
|
with db.session():
|
||||||
|
@ -411,13 +409,14 @@ class Nodes(Thread):
|
||||||
del u.info['local']
|
del u.info['local']
|
||||||
u.save()
|
u.save()
|
||||||
self.queue('add', u.id)
|
self.queue('add', u.id)
|
||||||
|
state.peers[u.id] = library.Peer(u.id)
|
||||||
for u in user.models.User.query.filter_by(queued=True):
|
for u in user.models.User.query.filter_by(queued=True):
|
||||||
logger.debug('adding queued node... %s', u.id)
|
logger.debug('adding queued node... %s', u.id)
|
||||||
self.queue('add', u.id, True)
|
self.queue('add', u.id, True)
|
||||||
self._local = LocalNodes()
|
self._local = LocalNodes()
|
||||||
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
|
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
|
||||||
self._cleanup.start()
|
self._cleanup.start()
|
||||||
self._pullcb = PeriodicCallback(self.pull, 60000)
|
self._pullcb = PeriodicCallback(self.pull, settings.server['pull_interval'])
|
||||||
self._pullcb.start()
|
self._pullcb.start()
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
|
@ -475,6 +474,7 @@ class Nodes(Thread):
|
||||||
if state.activity and state.activity.get('activity') == 'import':
|
if state.activity and state.activity.get('activity') == 'import':
|
||||||
return
|
return
|
||||||
self._pulling = True
|
self._pulling = True
|
||||||
|
library.sync_db()
|
||||||
users = []
|
users = []
|
||||||
with db.session():
|
with db.session():
|
||||||
from user.models import User
|
from user.models import User
|
||||||
|
@ -493,6 +493,7 @@ class Nodes(Thread):
|
||||||
self._pulling = False
|
self._pulling = False
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self.queue('pull')
|
||||||
while not state.shutdown:
|
while not state.shutdown:
|
||||||
args = self._q.get()
|
args = self._q.get()
|
||||||
if args:
|
if args:
|
||||||
|
@ -519,7 +520,6 @@ def publish_node():
|
||||||
state.check_nodes.start()
|
state.check_nodes.start()
|
||||||
state._online = PeriodicCallback(update_online, 60000)
|
state._online = PeriodicCallback(update_online, 60000)
|
||||||
state._online.start()
|
state._online.start()
|
||||||
state.nodes.pull()
|
|
||||||
|
|
||||||
def update_online():
|
def update_online():
|
||||||
online = state.tor and state.tor.is_online()
|
online = state.tor and state.tor.is_online()
|
||||||
|
|
|
@ -74,11 +74,16 @@ def shutdown():
|
||||||
if state.tasks:
|
if state.tasks:
|
||||||
logger.debug('shutdown tasks')
|
logger.debug('shutdown tasks')
|
||||||
state.tasks.join()
|
state.tasks.join()
|
||||||
|
if state.update:
|
||||||
|
logger.debug('shutdown updates')
|
||||||
|
state.update.join()
|
||||||
if state.node:
|
if state.node:
|
||||||
state.node.stop()
|
state.node.stop()
|
||||||
if state.tor:
|
if state.tor:
|
||||||
logger.debug('shutdown tor')
|
logger.debug('shutdown tor')
|
||||||
state.tor.shutdown()
|
state.tor.shutdown()
|
||||||
|
for peer in state.peers:
|
||||||
|
state.peers[peer].join()
|
||||||
if state.PID and os.path.exists(state.PID):
|
if state.PID and os.path.exists(state.PID):
|
||||||
logger.debug('remove %s', state.PID)
|
logger.debug('remove %s', state.PID)
|
||||||
os.unlink(state.PID)
|
os.unlink(state.PID)
|
||||||
|
@ -163,7 +168,7 @@ def run():
|
||||||
state.tasks = tasks.Tasks()
|
state.tasks = tasks.Tasks()
|
||||||
state.main.add_callback(start_node)
|
state.main.add_callback(start_node)
|
||||||
else:
|
else:
|
||||||
state.tasks = update.Update()
|
state.update = update.Update()
|
||||||
|
|
||||||
if ':' in settings.server['address']:
|
if ':' in settings.server['address']:
|
||||||
host = '[%s]' % settings.server['address']
|
host = '[%s]' % settings.server['address']
|
||||||
|
|
|
@ -48,6 +48,7 @@ server_defaults = {
|
||||||
'extract_text': True,
|
'extract_text': True,
|
||||||
'localnode_discovery': True,
|
'localnode_discovery': True,
|
||||||
'release_url': 'http://downloads.openmedialibrary.com/release.json',
|
'release_url': 'http://downloads.openmedialibrary.com/release.json',
|
||||||
|
'pull_interval': 60000
|
||||||
}
|
}
|
||||||
|
|
||||||
for key in server_defaults:
|
for key in server_defaults:
|
||||||
|
@ -99,4 +100,4 @@ USER_AGENT = 'OpenMediaLibrary/%s' % VERSION
|
||||||
|
|
||||||
DEBUG_HTTP = server.get('debug_http', False)
|
DEBUG_HTTP = server.get('debug_http', False)
|
||||||
|
|
||||||
DB_VERSION = 10
|
DB_VERSION = 11
|
||||||
|
|
|
@ -10,6 +10,7 @@ tor = False
|
||||||
update = False
|
update = False
|
||||||
shutdown = False
|
shutdown = False
|
||||||
websockets = []
|
websockets = []
|
||||||
|
peers = {}
|
||||||
|
|
||||||
activity = {}
|
activity = {}
|
||||||
removepeer = {}
|
removepeer = {}
|
||||||
|
|
|
@ -342,6 +342,8 @@ class Update(Thread):
|
||||||
db_version = migrate_8()
|
db_version = migrate_8()
|
||||||
if db_version < 10:
|
if db_version < 10:
|
||||||
db_version = migrate_10()
|
db_version = migrate_10()
|
||||||
|
if db_version < 11:
|
||||||
|
db_version = migrate_11()
|
||||||
settings.server['db_version'] = settings.DB_VERSION
|
settings.server['db_version'] = settings.DB_VERSION
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
@ -513,3 +515,27 @@ def migrate_10():
|
||||||
Find.query.filter_by(key=key, value=value).update({'sortvalue': updates[key][value]})
|
Find.query.filter_by(key=key, value=value).update({'sortvalue': updates[key][value]})
|
||||||
session.commit()
|
session.commit()
|
||||||
return 10
|
return 10
|
||||||
|
|
||||||
|
def migrate_11():
|
||||||
|
with db.session() as session:
|
||||||
|
from user.models import User, Metadata
|
||||||
|
from changelog import Changelog
|
||||||
|
import utils
|
||||||
|
for u in User.query.filter_by(peered=True):
|
||||||
|
peer = utils.get_peer(u.id)
|
||||||
|
last = Changelog.query.filter_by(user_id=u.id).order_by('-revision').first()
|
||||||
|
if last:
|
||||||
|
peer.info['revision'] = last.revision
|
||||||
|
for m in Metadata.query.filter_by(user_id=u.id):
|
||||||
|
peer.library[m.item_id] = {
|
||||||
|
'meta': dict(m.data),
|
||||||
|
'meta_hash': m.data_hash,
|
||||||
|
'modified': m.modified,
|
||||||
|
}
|
||||||
|
peer.library.commit()
|
||||||
|
peer.sync_info()
|
||||||
|
peer.sync_db()
|
||||||
|
Changelog.query.filter_by(user_id=u.id).delete()
|
||||||
|
Metadata.query.filter_by(user_id=u.id).delete()
|
||||||
|
session.commit()
|
||||||
|
return 11
|
||||||
|
|
|
@ -93,6 +93,11 @@ class User(db.Model):
|
||||||
if self.id == settings.USER_ID:
|
if self.id == settings.USER_ID:
|
||||||
j['username'] = settings.preferences['username']
|
j['username'] = settings.preferences['username']
|
||||||
j['contact'] = settings.preferences['contact']
|
j['contact'] = settings.preferences['contact']
|
||||||
|
elif self.id in state.peers:
|
||||||
|
peer = state.peers[self.id]
|
||||||
|
for key in ('username', 'contact'):
|
||||||
|
if key in peer.info:
|
||||||
|
j[key] = peer.info[key]
|
||||||
if keys:
|
if keys:
|
||||||
for k in set(j) - set(keys):
|
for k in set(j) - set(keys):
|
||||||
del j[k]
|
del j[k]
|
||||||
|
@ -161,6 +166,9 @@ class User(db.Model):
|
||||||
Changelog.query.filter_by(user_id=self.id).delete()
|
Changelog.query.filter_by(user_id=self.id).delete()
|
||||||
Metadata.query.filter_by(user_id=self.id).delete()
|
Metadata.query.filter_by(user_id=self.id).delete()
|
||||||
self.save()
|
self.save()
|
||||||
|
if self.id in state.peers:
|
||||||
|
state.peers[self.id].remove()
|
||||||
|
del state.peers[self.id]
|
||||||
|
|
||||||
def update_name(self):
|
def update_name(self):
|
||||||
if self.id == settings.USER_ID:
|
if self.id == settings.USER_ID:
|
||||||
|
@ -297,7 +305,7 @@ class List(db.Model):
|
||||||
if update_conditions(l._query.get('conditions', [])):
|
if update_conditions(l._query.get('conditions', [])):
|
||||||
l.save()
|
l.save()
|
||||||
|
|
||||||
def add_items(self, items):
|
def add_items(self, items, commit=True):
|
||||||
from item.models import Item
|
from item.models import Item
|
||||||
for item_id in items:
|
for item_id in items:
|
||||||
i = Item.get(item_id)
|
i = Item.get(item_id)
|
||||||
|
@ -305,9 +313,10 @@ class List(db.Model):
|
||||||
self.items.append(i)
|
self.items.append(i)
|
||||||
if self.user_id == settings.USER_ID:
|
if self.user_id == settings.USER_ID:
|
||||||
i.queue_download()
|
i.queue_download()
|
||||||
i.update()
|
i.update(commit=commit)
|
||||||
state.db.session.add(self)
|
state.db.session.add(self)
|
||||||
state.db.session.commit()
|
if commit:
|
||||||
|
state.db.session.commit()
|
||||||
if self.user_id == settings.USER_ID and self.name != '':
|
if self.user_id == settings.USER_ID and self.name != '':
|
||||||
Changelog.record(self.user, 'addlistitems', self.name, items)
|
Changelog.record(self.user, 'addlistitems', self.name, items)
|
||||||
|
|
||||||
|
@ -318,28 +327,30 @@ class List(db.Model):
|
||||||
else:
|
else:
|
||||||
return self.user.items.join(Item.lists, aliased=True).filter(List.id == self.id)
|
return self.user.items.join(Item.lists, aliased=True).filter(List.id == self.id)
|
||||||
|
|
||||||
def remove_items(self, items):
|
def remove_items(self, items, commit=True):
|
||||||
from item.models import Item
|
from item.models import Item
|
||||||
for item_id in items:
|
for item_id in items:
|
||||||
i = Item.get(item_id)
|
i = Item.get(item_id)
|
||||||
if i:
|
if i:
|
||||||
if i in self.items:
|
if i in self.items:
|
||||||
self.items.remove(i)
|
self.items.remove(i)
|
||||||
i.update()
|
i.update(commit=commit)
|
||||||
state.db.session.add(self)
|
state.db.session.add(self)
|
||||||
state.db.session.commit()
|
if commit:
|
||||||
|
state.db.session.commit()
|
||||||
if self.user_id == settings.USER_ID and self.name != '':
|
if self.user_id == settings.USER_ID and self.name != '':
|
||||||
Changelog.record(self.user, 'removelistitems', self.name, items)
|
Changelog.record(self.user, 'removelistitems', self.name, items)
|
||||||
|
|
||||||
def remove(self):
|
def remove(self, commit=True):
|
||||||
if not self._query:
|
if not self._query:
|
||||||
for i in self.items:
|
q = list_items.delete().where(list_items.columns['list_id'].is_(self.id))
|
||||||
self.items.remove(i)
|
state.db.session.execute(q)
|
||||||
if not self._query:
|
if not self._query:
|
||||||
if self.user_id == settings.USER_ID and self.name != '':
|
if self.user_id == settings.USER_ID and self.name != '':
|
||||||
Changelog.record(self.user, 'removelist', self.name)
|
Changelog.record(self.user, 'removelist', self.name)
|
||||||
state.db.session.delete(self)
|
state.db.session.delete(self)
|
||||||
state.db.session.commit()
|
if commit:
|
||||||
|
state.db.session.commit()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def public_id(self):
|
def public_id(self):
|
||||||
|
@ -445,7 +456,7 @@ class Metadata(db.Model):
|
||||||
return m
|
return m
|
||||||
|
|
||||||
def get_hash(self):
|
def get_hash(self):
|
||||||
return utils.get_meta_hash(self.data.copy())
|
return utils.get_meta_hash(self.data)
|
||||||
|
|
||||||
def save(self, commit=True, modified=None):
|
def save(self, commit=True, modified=None):
|
||||||
if modified is None:
|
if modified is None:
|
||||||
|
|
|
@ -409,6 +409,7 @@ def get_ratio(data):
|
||||||
|
|
||||||
|
|
||||||
def get_meta_hash(data):
|
def get_meta_hash(data):
|
||||||
|
data = data.copy()
|
||||||
if 'sharemetadata' in data:
|
if 'sharemetadata' in data:
|
||||||
del data['sharemetadata']
|
del data['sharemetadata']
|
||||||
for key in list(data):
|
for key in list(data):
|
||||||
|
@ -469,3 +470,10 @@ def ctl(*args):
|
||||||
|
|
||||||
def user_sort_key(u):
|
def user_sort_key(u):
|
||||||
return ox.sort_string(str(u.get('index', '')) + 'Z' + (u.get('name') or ''))
|
return ox.sort_string(str(u.get('index', '')) + 'Z' + (u.get('name') or ''))
|
||||||
|
|
||||||
|
def get_peer(peerid):
|
||||||
|
import state
|
||||||
|
import library
|
||||||
|
if peerid not in state.peers:
|
||||||
|
state.peers[peerid] = library.Peer(peerid)
|
||||||
|
return state.peers[peerid]
|
||||||
|
|
|
@ -8,3 +8,4 @@ python-stdnum==0.9
|
||||||
PyPDF2==1.23
|
PyPDF2==1.23
|
||||||
pysocks
|
pysocks
|
||||||
stem
|
stem
|
||||||
|
sqlitedict==1.4.0
|
||||||
|
|
Loading…
Reference in a new issue