rebuild changelog and redownload from peers.

This commit is contained in:
j 2016-01-23 22:19:34 +05:30
parent 5bb1d4c162
commit ff5917ade0
9 changed files with 129 additions and 58 deletions

View file

@ -77,10 +77,48 @@ class Changelog(db.Model):
logger.debug('record change: %s', c.json()) logger.debug('record change: %s', c.json())
@classmethod @classmethod
def apply_changes(cls, user, changes): def apply_changes(cls, user, changes, first=False):
trigger = changes trigger = changes
if trigger: if trigger:
trigger_event('change', {}); trigger_event('change', {})
if first:
logger.debug('remove left over items')
items = set()
lists = {}
peers = set()
for change in changes:
if change[2][0] == 'additem':
items.add(change[2][1])
if change[2][0] == 'addlist':
lists[change[2][1]] = set()
if change[2][0] == 'addlistitems':
if not change[2][1] in lists:
lists[change[2][1]] = set()
for i in change[2][2]:
lists[change[2][1]].add(i)
if change[2][0] == 'addpeer':
peers.add(change[2][1])
for i in user.library.items:
if i.id not in items and user in i.users:
i.users.remove(user)
if i.users:
i.update()
else:
i.delete()
for name in lists:
qs = user.lists.filter_by(name=name)
if qs.count():
l = qs[0]
remove = []
for i in l.get_items():
if i.id not in lists[name]:
remove.append(i.id)
l.items.remove(i.id)
for peer in user.models.Users.query:
if user.id in peer.info.get('users', {}) and peer.id not in peers:
del peer.info['users'][user.id]
peer.save()
for change in changes: for change in changes:
if user.id in state.removepeer: if user.id in state.removepeer:
del state.removepeer[user.id] del state.removepeer[user.id]
@ -105,10 +143,9 @@ class Changelog(db.Model):
c.timestamp = timestamp c.timestamp = timestamp
c.user_id = user.id c.user_id = user.id
c.revision = revision c.revision = revision
c.data = data c.data = json.dumps(data)
args = json.loads(data) logger.debug('apply change from %s: %s(%s)', user.name, data[0], data[1:])
logger.debug('apply change from %s: %s(%s)', user.name, args[0], args[1:]) if getattr(c, 'action_' + data[0])(user, timestamp, *data[1:]):
if getattr(c, 'action_' + args[0])(user, timestamp, *args[1:]):
logger.debug('change applied') logger.debug('change applied')
state.db.session.add(c) state.db.session.add(c)
state.db.session.commit() state.db.session.commit()
@ -126,26 +163,8 @@ class Changelog(db.Model):
def json(self): def json(self):
timestamp = self.timestamp or datetime2ts(self.created) timestamp = self.timestamp or datetime2ts(self.created)
return [self.revision, timestamp, self.data] return [self.revision, timestamp, json.loads(self.data)]
@classmethod
def restore(cls, user_id, path=None):
from user.models import User
user = User.get_or_create(user_id)
if not path:
path = '/tmp/oml_changelog_%s.json' % user_id
with open(path, 'r') as fd:
for change in fd:
change = json.loads(change)
cls.apply_change(user, change, user_id == settings.USER_ID)
@classmethod
def export(cls, user_id, path=None):
if not path:
path = '/tmp/oml_changelog_%s.json' % user_id
with open(path, 'w') as fd:
for c in cls.query.filter_by(user_id=user_id).order_by('revision'):
fd.write(json.dumps(c.json(), ensure_ascii=False) + '\n')
def action_additem(self, user, timestamp, itemid, info): def action_additem(self, user, timestamp, itemid, info):
from item.models import Item from item.models import Item
@ -435,17 +454,17 @@ class Changelog(db.Model):
rv = data[0] rv = data[0]
ts = data[1] ts = data[1]
data = [op, id] + data[2:] data = [op, id] + data[2:]
_changes.append([rv, ts, json.dumps(data)]) _changes.append([rv, ts, data])
_changes.sort(key=lambda change: (change[0], change[1])) _changes.sort(key=lambda change: (change[0], change[1]))
if orderlists: if orderlists:
ids = [l.name for l in List.query.filter_by(user_id=user_id,type='static').order_by('index_') if l.name] ids = [l.name for l in List.query.filter_by(user_id=user_id,type='static').order_by('index_') if l.name]
if len(ids) > 1: if len(ids) > 1:
_changes.append([-1, timestamp, json.dumps(['orderlists', ids])]) _changes.append([-1, timestamp, ['orderlists', ids]])
userinfo = state.user().json() userinfo = state.user().json()
if editusername: if editusername:
_changes.append([-1, timestamp, json.dumps(['editusername', userinfo['username']])]) _changes.append([-1, timestamp, ['editusername', userinfo['username']]])
if editcontact: if editcontact:
_changes.append([-1, timestamp, json.dumps(['editcontact', userinfo['contact']])]) _changes.append([-1, timestamp, ['editcontact', userinfo['contact']]])
if _changes: if _changes:
r = revision r = revision
for c in reversed(_changes): for c in reversed(_changes):

View file

@ -13,7 +13,6 @@ import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
base = settings.server['directory_service']
base = 'http://hpjats6xixrleoqg.onion:25519' base = 'http://hpjats6xixrleoqg.onion:25519'
def get(vk): def get(vk):

View file

@ -38,7 +38,7 @@ def can_connect(data):
r = opener.open(url) r = opener.open(url)
version = r.headers.get('X-Node-Protocol', None) version = r.headers.get('X-Node-Protocol', None)
if version != settings.NODE_PROTOCOL: if version != settings.NODE_PROTOCOL:
logger.debug('version does not match local: %s remote %s', settings.NODE_PROTOCOL, version) logger.debug('version does not match local: %s remote %s (%s)', settings.NODE_PROTOCOL, version, data['id'])
return False return False
c = r.read() c = r.read()
return True return True

View file

@ -53,11 +53,6 @@ def lookup_provider(arg):
def lookup(key, value): def lookup(key, value):
if not isvalid_id(key, value): if not isvalid_id(key, value):
return {} return {}
'''
if not settings.server.get('local_lookup'):
import oml.metaremote
return oml.metaremote.lookup(key, value)
'''
if key == 'isbn': if key == 'isbn':
try: try:
data = google.info(key, value) data = google.info(key, value)

View file

@ -33,7 +33,6 @@ ENCODING='base64'
class Node(Thread): class Node(Thread):
_running = True _running = True
_pulling = False
host = None host = None
local = None local = None
_online = None _online = None
@ -49,10 +48,7 @@ class Node(Thread):
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
self._pull = PeriodicCallback(self.pull, 60000)
self._pull.start()
self.ping() self.ping()
self.pull()
def run(self): def run(self):
while self._running: while self._running:
@ -63,12 +59,6 @@ class Node(Thread):
self._send_response() self._send_response()
elif action == 'ping': elif action == 'ping':
self.online = self.can_connect() self.online = self.can_connect()
elif action == 'pull':
self._pulling = True
self.online = self.can_connect()
if self.online:
self.pullChanges()
self._pulling = False
else: else:
logger.debug('unknown action %s', action) logger.debug('unknown action %s', action)
@ -77,9 +67,6 @@ class Node(Thread):
self._q.put('') self._q.put('')
#return Thread.join(self) #return Thread.join(self)
def pull(self):
if state.online and not self._pulling:
self._q.put('pull')
def ping(self): def ping(self):
if state.online: if state.online:
@ -262,8 +249,6 @@ class Node(Thread):
}) })
def pullChanges(self): def pullChanges(self):
if state.activity and state.activity.get('activity') == 'import':
return
with db.session(): with db.session():
u = user.models.User.get_or_create(self.user_id) u = user.models.User.get_or_create(self.user_id)
if not self.online or not u.peered: if not self.online or not u.peered:
@ -278,7 +263,7 @@ class Node(Thread):
return False return False
if not changes: if not changes:
return False return False
r = Changelog.apply_changes(u, changes) r = Changelog.apply_changes(u, changes, first=from_revision == 0)
return r return r
def peering(self, action): def peering(self, action):
@ -416,6 +401,7 @@ class Node(Thread):
class Nodes(Thread): class Nodes(Thread):
_nodes = {} _nodes = {}
_local = None _local = None
_pulling = False
def __init__(self): def __init__(self):
self._q = Queue() self._q = Queue()
@ -432,14 +418,21 @@ class Nodes(Thread):
self._local = LocalNodes() self._local = LocalNodes()
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000) self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
self._cleanup.start() self._cleanup.start()
self._pullcb = PeriodicCallback(self.pull, 60000)
self._pullcb.start()
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.start() self.start()
self.pull()
def cleanup(self): def cleanup(self):
if self._running and self._local: if self._running and self._local:
self._local.cleanup() self._local.cleanup()
def pull(self):
if state.online and not self._pulling:
self.queue('pull')
def queue(self, *args): def queue(self, *args):
self._q.put(list(args)) self._q.put(list(args))
@ -480,6 +473,16 @@ class Nodes(Thread):
if send_response: if send_response:
self._nodes[user_id].send_response() self._nodes[user_id].send_response()
def _pull(self):
if state.activity and state.activity.get('activity') == 'import':
return
self._pulling = True
for node in list(self._nodes.values()):
node.online = node.can_connect()
if node.online:
node.pullChanges()
self._pulling = False
def run(self): def run(self):
while self._running: while self._running:
args = self._q.get() args = self._q.get()
@ -488,6 +491,8 @@ class Nodes(Thread):
self.cleanup() self.cleanup()
elif args[0] == 'add': elif args[0] == 'add':
self._add(*args[1:]) self._add(*args[1:])
elif args[0] == 'pull':
self._pull()
else: else:
self._call(*args) self._call(*args)

View file

@ -47,8 +47,6 @@ server_defaults = {
'node_address': '', 'node_address': '',
'extract_text': True, 'extract_text': True,
'localnode_discovery': True, 'localnode_discovery': True,
'directory_service': 'http://[2a01:4f8:120:3201::3]:25519',
'meta_service': 'http://meta.openmedialibrary.com/api/',
'release_url': 'http://downloads.openmedialibrary.com/release.json', 'release_url': 'http://downloads.openmedialibrary.com/release.json',
} }
@ -94,11 +92,11 @@ if 'modules' in release and 'openmedialibrary' in release['modules']:
else: else:
MINOR_VERSION = 'git' MINOR_VERSION = 'git'
NODE_PROTOCOL="0.6" NODE_PROTOCOL="0.7"
VERSION="%s.%s" % (NODE_PROTOCOL, MINOR_VERSION) VERSION="%s.%s" % (NODE_PROTOCOL, MINOR_VERSION)
USER_AGENT = 'OpenMediaLibrary/%s' % VERSION USER_AGENT = 'OpenMediaLibrary/%s' % VERSION
DEBUG_HTTP = server.get('debug_http', False) DEBUG_HTTP = server.get('debug_http', False)
DB_VERSION = 7 DB_VERSION = 8

View file

@ -29,7 +29,7 @@ class Tasks(Thread):
if m: if m:
try: try:
action, data = m action, data = m
logger.debug('run task %s', action) logger.debug('%s start', action)
if action == 'ping': if action == 'ping':
trigger_event('pong', data) trigger_event('pong', data)
elif action == 'import': elif action == 'import':
@ -48,7 +48,7 @@ class Tasks(Thread):
sync_metadata(*data) sync_metadata(*data)
else: else:
trigger_event('error', {'error': 'unknown action'}) trigger_event('error', {'error': 'unknown action'})
logger.debug('finished task %s', action) logger.debug('%s done', action)
except: except:
logger.debug('task failed', exc_info=1) logger.debug('task failed', exc_info=1)
self.q.task_done() self.q.task_done()

View file

@ -420,3 +420,31 @@ def migrate_7():
db.run_sql('DROP TABLE IF EXISTS scrape') db.run_sql('DROP TABLE IF EXISTS scrape')
db.run_sql('VACUUM') db.run_sql('VACUUM')
return 7 return 7
def migrate_8():
for key in ('directory_service', 'meta_service', 'local_lookup', 'cert'):
if key in settings.server:
del settings.server[key]
list_cache = os.path.join(settings.data_path, 'list_cache.json')
if os.path.exists(list_cache):
os.unlink(list_cache)
with db.session() as session:
import item.models
for i in item.models.Item.query:
delta = set(i.meta)-set(i.meta_keys)
if delta:
for key in delta:
del i.meta[key]
session.add(i)
session.commit()
import changelog
import user.models
changelog.Changelog.query.delete()
u = user.models.User.get(settings.USER_ID)
u.rebuild_changelog()
for peer in user.models.User.query:
if peer.id != u.id:
if len(peer.id) != 16:
session.delete(peer)
session.commit()
return 8

View file

@ -7,6 +7,7 @@ import os
import shutil import shutil
import ox import ox
from sqlalchemy.orm import load_only
import sqlalchemy as sa import sqlalchemy as sa
from changelog import Changelog from changelog import Changelog
@ -182,6 +183,32 @@ class User(db.Model):
session.connection().execute(sql.format(oid=self.id, nid=service_id)) session.connection().execute(sql.format(oid=self.id, nid=service_id))
session.commit() session.commit()
def rebuild_changelog(self):
Changelog.query.filter_by(user_id=self.id).delete()
for item in self.library.get_items().order_by('created'):
Changelog.record(self, 'additem', item.id, item.info)
Changelog.record(self, 'edititem', item.id, item.meta)
lists = []
for l in List.query.filter_by(user_id=self.id, type='static').order_by('index_'):
if l.name:
lists.append(l.name)
Changelog.record(self, 'addlist', l.name)
items = [i.id for i in l.get_items().options(load_only('id'))]
if items:
Changelog.record(self, 'addlistitems', l.name, items)
if len(lists) > 1:
Changelog.record(self, 'orderlists', lists)
for peer in User.query.filter_by(peered=True):
Changelog.record(self, 'addpeer', peer.id, self.nickname)
if peer.info.get('contact'):
Changelog.record(self, 'editpeer', peer.id, {
'contact': peer.info.get('contact')
})
if settings.preferences.get('contact'):
Changelog.record(self, 'editcontact', settings.preferences.get('contact'))
list_items = sa.Table('listitem', db.metadata, list_items = sa.Table('listitem', db.metadata,
sa.Column('list_id', sa.Integer(), sa.ForeignKey('list.id')), sa.Column('list_id', sa.Integer(), sa.ForeignKey('list.id')),
sa.Column('item_id', sa.String(32), sa.ForeignKey('item.id')) sa.Column('item_id', sa.String(32), sa.ForeignKey('item.id'))