simple changelog

This commit is contained in:
j 2017-06-03 22:50:14 +02:00
parent 04f994d4b7
commit e966256fa2
15 changed files with 267 additions and 103 deletions

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
from datetime import datetime
import json
@ -17,6 +17,49 @@ import state
import logging
logger = logging.getLogger(__name__)
def changelog_path():
return os.path.join(settings.data_path, 'peers', '%s.log' % settings.USER_ID)
def next_revision():
settings.server['revision'] = settings.server.get('revision', -1) + 1
return settings.server['revision']
def add_record(action, *args, **kwargs):
if '_ts' in kwargs:
timestamp = kwargs['_ts']
del kwargs['_ts']
else:
timestamp = None
if not timestamp:
timestamp = datetime.utcnow()
timestamp = datetime2ts(timestamp)
revision = next_revision()
data = [revision, timestamp, [action] + list(args)]
data = json.dumps(data, ensure_ascii=False)
path = changelog_path()
if os.path.exists(path):
mode = 'a'
state.changelog_size = os.path.getsize(path)
else:
mode = 'w'
state.changelog_size = 0
with open(path, mode) as fd:
fd.write(data + '\n')
state.changelog_size = os.path.getsize(path)
#logger.debug('record change: %s', data)
def changelog_size():
if state.changelog_size is None:
path = changelog_path()
if not os.path.exists(path):
return 0
return os.path.getsize(path)
else:
return state.changelog_size
class Changelog(db.Model):
'''
additem itemid metadata from file (info) + OLID
@ -47,29 +90,7 @@ class Changelog(db.Model):
@classmethod
def record(cls, user, action, *args, **kwargs):
commit = True
if '_commit' in kwargs:
commit = kwargs['_commit']
del kwargs['_commit']
if '_ts' in kwargs:
timestamp = kwargs['_ts']
del kwargs['_ts']
else:
timestamp = None
c = cls()
c.created = datetime.utcnow()
if not timestamp:
timestamp = c.created
c.timestamp = datetime2ts(timestamp)
c.user_id = user.id
c.revision = cls.query.filter_by(user_id=user.id).count()
c.data = json.dumps([action] + list(args), ensure_ascii=False)
_data = str(c.revision) + str(c.timestamp) + c.data
_data = _data.encode()
state.db.session.add(c)
if commit:
state.db.session.commit()
logger.debug('record change: %s', c.json())
return add_record(action, *args, **kwargs)
@classmethod
def apply_changes(cls, user_, changes, first=False):
@ -162,7 +183,6 @@ class Changelog(db.Model):
timestamp = self.timestamp or datetime2ts(self.created)
return [self.revision, timestamp, json.loads(self.data)]
def action_additem(self, user, timestamp, itemid, info):
from item.models import Item
i = Item.get(itemid)

View file

@ -55,8 +55,11 @@ class Downloads(Thread):
del self.transfers[itemid]
continue
if t.get('added') and t.get('progress', -1) < 1:
if not 'users' in t:
if 'users' not in t:
i = item.models.Item.get(itemid)
if not i:
del self.transfers[itemid]
continue
t['users'] = [u.id for u in i.users]
for uid in t['users']:
if state.shutdown:

View file

@ -15,7 +15,7 @@ from sqlalchemy.orm import load_only
from sqlalchemy.schema import CreateTable
import sqlalchemy as sa
from changelog import Changelog
from changelog import add_record
from db import MutableDict
import json_pickler
from .icons import icons
@ -355,7 +355,7 @@ class Item(db.Model):
self.update_cover()
user = state.user()
if record and user in self.users:
Changelog.record(user, 'edititem', self.id, record, _ts=modified)
add_record('edititem', self.id, record, _ts=modified)
if 'sharemetadata' in record and not record['sharemetadata']:
self.sync_metadata()
@ -419,7 +419,7 @@ class Item(db.Model):
f.move()
user = state.user()
if record and user in self.users:
Changelog.record(user, 'edititem', self.id, record, _ts=self.modified)
add_record('edititem', self.id, record, _ts=self.modified)
if 'cover' in record:
if state.tasks:
state.tasks.queue('getcover', self.id)
@ -553,11 +553,11 @@ class Item(db.Model):
if state.downloads and self.id in state.downloads.transfers:
del state.downloads.transfers[self.id]
self.added = datetime.utcnow()
Changelog.record(u, 'additem', self.id, f.info)
Changelog.record(u, 'edititem', self.id, self.meta)
add_record('additem', self.id, f.info)
add_record('edititem', self.id, self.meta)
for l in self.lists.filter_by(user_id=settings.USER_ID):
if l.name != '':
Changelog.record(l.user, 'addlistitems', l.name, [self.id])
add_record('addlistitems', l.name, [self.id])
self.update()
f.move()
self.update_icons()
@ -601,7 +601,7 @@ class Item(db.Model):
if state.downloads:
if self.id in state.downloads.transfers:
del state.downloads.transfers[self.id]
Changelog.record(user, 'removeitem', self.id)
add_record('removeitem', self.id)
class Sort(db.Model):
__tablename__ = 'sort'

View file

@ -10,7 +10,7 @@ import time
import ox
from changelog import Changelog
from changelog import add_record
from item.models import File, Item
from user.models import List
from utils import remove_empty_folders
@ -87,8 +87,8 @@ def add_file(id, f, prefix, from_=None, commit=True):
item.added = datetime.utcnow()
logger.debug('%s load metadata %s', id, path)
item.load_metadata()
Changelog.record(user, 'additem', item.id, file.info)
Changelog.record(user, 'edititem', item.id, item.meta)
add_record('additem', item.id, file.info)
add_record('edititem', item.id, item.meta)
logger.debug('%s extract icons %s', id, path)
item.update_icons()
item.modified = datetime.utcnow()

View file

@ -29,6 +29,7 @@ class Peer(object):
base = os.path.join(settings.data_path, 'peers')
ox.makedirs(base)
self._dbpath = os.path.join(base, '%s.db' % id)
self._logpath = os.path.join(base, '%s.log' % id)
self._infopath = os.path.join(base, '%s.json' % id)
self.id = id
@ -39,11 +40,28 @@ class Peer(object):
self.info = json.load(f)
else:
self.info = {}
if not 'peers' in self.info:
if 'peers' not in self.info:
self.info['peers'] = {}
if not 'lists' in self.info:
if 'lists' not in self.info:
self.info['lists'] = {}
def apply_log(self):
changes = []
if os.path.exists(self._logpath):
with open(self._logpath) as fd:
for line in fd:
if line:
try:
data = json.loads(line)
except:
logger.debug('failed to parse line: %s', line)
return
if data[0] <= self.info.get('revision', -1):
continue
changes.append(data)
if changes:
self.apply_changes(changes)
def apply_changes(self, changes):
r = True
for change in changes:

View file

@ -126,7 +126,7 @@ class LocalNodes(dict):
self.pop(id, None)
def on_service_state_change(self, zeroconf, service_type, name, state_change):
if not '[' in name:
if '[' not in name:
id = name.split('.')[0]
else:
id = name.split('[')[1].split(']')[0]
@ -145,7 +145,7 @@ class LocalNodes(dict):
key = key.decode()
self[id][key] = value.decode()
logger.debug('add: %s [%s] (%s:%s)', self[id].get('username', 'anon'), id, self[id]['host'], self[id]['port'])
if state.tasks:
if state.tasks and id in self:
state.tasks.queue('addlocalinfo', self[id])
elif state_change is ServiceStateChange.Removed:
logger.debug('remove: %s', id)

View file

@ -2,7 +2,6 @@
# vi:si:et:sw=4:sts=4:ts=4
from changelog import Changelog
from user.models import User
from websocket import trigger_event
import state
@ -11,20 +10,6 @@ import settings
import logging
logger = logging.getLogger(__name__)
def api_pullChanges(remote_id, user_id=None, from_=None, to=None):
if user_id and not from_ and not to:
from_ = user_id
user_id = None
if user_id and from_ and not to:
if isinstance(user_id, int):
to = from_
from_ = user_id
user_id = None
from_ = from_ or 0
if user_id:
return []
return Changelog.aggregated_changes(from_)
def api_requestPeering(user_id, username, message):
event = 'peering.request'
user = User.get_or_create(user_id)

View file

@ -25,6 +25,7 @@ import db
import settings
import state
import user
from changelog import changelog_size, changelog_path
from websocket import trigger_event
from . import nodeapi
@ -78,10 +79,6 @@ class NodeServer(ThreadingMixIn, TLSTCPServer):
def api_call(action, user_id, args):
with db.session():
u = user.models.User.get(user_id)
if u and action in ('pullChanges', ) and not u.peered and u.pending == 'sent':
u.update_peering(True)
state.nodes.queue('add', u.id, True)
trigger_event('peering.accept', u.json())
if action in (
'requestPeering', 'acceptPeering', 'rejectPeering',
'removePeering', 'cancelPeering'
@ -172,6 +169,8 @@ class Handler(http.server.SimpleHTTPRequestHandler):
self.write_with_limit(content, content_length)
else:
self.write_file_with_limit(path, content_length)
elif len(parts) == 2 and parts[1] == 'log':
self._changelog()
else:
self.send_response(200, 'OK')
self.send_header('Content-type', 'text/plain')
@ -179,6 +178,53 @@ class Handler(http.server.SimpleHTTPRequestHandler):
self.end_headers()
self.wfile.write('Open Media Library\n'.encode())
def _denied(self):
self.send_response(403, 'denied')
self.end_headers()
def _changelog(self):
x509 = self.connection.get_peer_certificate()
user_id = get_service_id(x509.get_pubkey()) if x509 else None
with db.session():
u = user.models.User.get(user_id)
if not u:
return self._denied()
if u.pending:
logger.debug('ignore request from pending peer[%s] %s (%s)',
user_id, action, args)
return self._denied()
if not u.peered and u.pending == 'sent':
u.update_peering(True)
state.nodes.queue('add', u.id, True)
trigger_event('peering.accept', u.json())
if not u.peered:
return self._denied()
path = changelog_path()
content_length = changelog_size()
with open(path, 'rb') as log:
request_range = self.headers.get('Range', '')
if request_range:
r = request_range.split('=')[-1].split('-')
start = int(r[0])
end = int(r[1]) if r[1] else (content_length - 1)
if start == content_length:
content_length = 0
else:
content_length = end - start + 1
if content_length < 0:
content_length = os.path.getsize(path)
self.send_response(200, 'OK')
else:
log.seek(start)
self.send_response(206, 'OK')
else:
self.send_response(200, 'OK')
self.send_header('Content-type', 'text/json')
self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL)
self.send_header('Content-Length', str(content_length))
self.end_headers()
self.write_fd_with_limit(log, content_length)
def gzip_data(self, data):
encoding = self.headers.get('Accept-Encoding')
if encoding.find('gzip') != -1:
@ -203,7 +249,6 @@ class Handler(http.server.SimpleHTTPRequestHandler):
def do_POST(self):
'''
API
pullChanges [userid] from [to]
requestPeering username message
acceptPeering username message
rejectPeering message
@ -287,18 +332,27 @@ class Handler(http.server.SimpleHTTPRequestHandler):
self.wfile.write(data)
position += chunk_size
def write_file_with_limit(self, path, content_length):
def write_fd_with_limit(self, f, content_length):
chunk_size = self.chunk_size(content_length)
with open(path, 'rb') as f:
position = 0
while True:
data = f.read(chunk_size)
if not data:
break
self.wfile.write(data)
position += chunk_size
if position + chunk_size > content_length:
chunk_size = content_length - position
if chunk_size <= 0:
break
if state.bandwidth:
while not state.bandwidth.upload(chunk_size) and self.server._running:
time.sleep(0.1)
def write_file_with_limit(self, path, content_length):
with open(path, 'rb') as f:
self.write_fd_with_limit(f, content_length)
class Server(Thread):
http_server = None

View file

@ -239,24 +239,81 @@ class Node(Thread):
self.online = self.can_connect()
if not self.online or state.shutdown:
return
with db.session():
u = user.models.User.get_or_create(self.user_id)
if not u or not self.online or not u.peered:
return True
self.resolve()
peer = get_peer(self.user_id)
from_revision = peer.info.get('revision', -1) + 1
try:
changes = self.request('pullChanges', from_revision)
except:
self.online = False
path = peer._logpath
if os.path.exists(path):
size = os.path.getsize(path)
else:
size = 0
url = '%s/log' % self.url
if DEBUG_NODES:
logger.debug('%s went offline', u.name, exc_info=True)
logger.debug('pullChanges: %s [%s]', self.user_id, url)
headers = self.headers.copy()
if size:
headers['Range'] = '%s-' % size
self._opener.addheaders = list(zip(headers.keys(), headers.values()))
try:
r = self._opener.open(url, timeout=self.TIMEOUT*60)
except urllib.error.HTTPError as e:
if e.code == 403:
logger.debug('pullChanges 403: %s (%s)', url, self.user_id)
if state.tasks:
state.tasks.queue('peering', (self.user_id, False))
del self._nodes[self.user_id]
self.online = False
else:
logger.debug('unknown http errpr %s %s (%s)', e.code, url, self.user_id)
return False
if not changes:
except socket.timeout:
logger.debug('timeout %s', url)
return False
except socks.GeneralProxyError:
logger.debug('openurl failed %s', url)
return False
except urllib.error.URLError as e:
logger.debug('openurl failed urllib2.URLError %s', e.reason)
return False
except:
logger.debug('openurl failed %s', url, exc_info=True)
return False
if r.getcode() in (200, 206):
changed = False
chunk_size = 16 * 1024
mode = 'ab' if r.getcode() == 206 else 'wb'
content = b''
try:
if r.headers.get('content-encoding', None) == 'gzip':
fileobj = gzip.GzipFile(fileobj=r)
else:
fileobj = r
for chunk in iter(lambda: fileobj.read(chunk_size), b''):
content += chunk
eol = content.rfind(b'\n') + 1
if eol > 0:
with open(path, mode) as fd:
fd.write(content[:eol])
content = content[eol:]
mode = 'ab'
changed = True
if state.shutdown:
return False
if state.bandwidth:
while not state.bandwidth.download(chunk_size) and not state.shutdown:
time.sleep(0.1)
if content:
with open(path, mode) as fd:
fd.write(content)
changed = True
if changed:
peer.apply_log()
except:
logger.debug('download failed %s', url, exc_info=True)
return False
else:
logger.debug('FAILED %s', url)
return False
#with open('/tmp/changelog_%s_%s.json' % (self.user_id, from_revision), 'w') as f:
# json.dump(changes, f, ensure_ascii=False, indent=4)
return peer.apply_changes(changes)
def peering(self, action):
with db.session():
@ -417,7 +474,7 @@ class Nodes(Thread):
del u.info['local']
u.save()
self.queue('add', u.id)
state.peers[u.id] = library.Peer(u.id)
get_peer(u.id)
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
self.queue('add', u.id, True)

View file

@ -156,6 +156,7 @@ def run():
state.tor = tor.Tor()
state.node = node.server.start()
state.nodes = nodes.Nodes()
def publish():
if not state.tor.is_online():
state.main.call_later(10, publish)

View file

@ -81,11 +81,11 @@ if 'modules' in release and 'openmedialibrary' in release['modules']:
else:
MINOR_VERSION = 'git'
NODE_PROTOCOL="0.7"
NODE_PROTOCOL = "0.8"
VERSION = "%s.%s" % (NODE_PROTOCOL, MINOR_VERSION)
USER_AGENT = 'OpenMediaLibrary/%s' % VERSION
DEBUG_HTTP = server.get('debug_http', False)
DB_VERSION = 12
DB_VERSION = 13

View file

@ -1,3 +1,5 @@
from threading import local
bandwidth = None
host = None
main = None
@ -12,13 +14,14 @@ shutdown = False
websockets = []
peers = {}
changelog_size = None
activity = {}
removepeer = {}
db = local()
def user():
import settings
import user.models
return user.models.User.get_or_create(settings.USER_ID)
from threading import local
db = local()

View file

@ -365,6 +365,8 @@ class Update(Thread):
db_version = migrate_11()
if db_version < 12:
db_version = migrate_12()
if db_version < 13:
db_version = migrate_13()
settings.server['db_version'] = settings.DB_VERSION
def run(self):
@ -589,3 +591,22 @@ def migrate_12():
'DROP TABLE IF EXISTS transfer'
])
return 12
def migrate_13():
import settings
import changelog
import os
import json
path = os.path.join(settings.data_path, 'peers', '%s.log' % settings.USER_ID)
if not os.path.exists(path):
with db.session() as session:
revision = -1
qs = changelog.Changelog.query.filter_by(user_id=settings.USER_ID)
with open(path, 'w') as fd:
for c in qs.order_by('timestamp'):
data = json.dumps([c.revision, c.timestamp, json.loads(c.data)], ensure_ascii=False)
fd.write(data + '\n')
revision = c.revision
if revision > -1:
settings.server['revision'] = revision
return 13

View file

@ -8,7 +8,7 @@ import os
import ox
from changelog import Changelog
from changelog import add_record
from oxtornado import actions
from utils import update_dict, user_sort_key
from . import models
@ -79,9 +79,9 @@ def setPreferences(data):
u.update_name()
u.save()
if change_username:
Changelog.record(u, 'editusername', data['username'])
add_record('editusername', data['username'])
if change_contact:
Changelog.record(state.user(), 'editcontact', data['contact'])
add_record('editcontact', data['contact'])
if change_path:
state.tasks.queue('changelibrarypath', change_path)
if change_autostart:
@ -227,7 +227,7 @@ def editList(data):
validate_query(data['query'])
l._query = data['query']
if l.type == 'static' and name != l.name:
Changelog.record(state.user(), 'editlist', name, {'name': l.name})
add_record('editlist', name, {'name': l.name})
l.save()
return l.json()
actions.register(editList, cache=False)
@ -303,7 +303,7 @@ def sortLists(data):
state.db.session.add(l)
state.db.session.commit()
if lists:
Changelog.record(state.user(), 'orderlists', lists)
add_record('orderlists', lists)
return {}
actions.register(sortLists, cache=False)

View file

@ -9,7 +9,7 @@ import ox
from sqlalchemy.orm import load_only
import sqlalchemy as sa
from changelog import Changelog
from changelog import add_record
from db import MutableDict
import db
import json_pickler
@ -159,7 +159,7 @@ class User(db.Model):
self.peered = False
self.save()
if not was_peering:
Changelog.record(state.user(), 'addpeer', self.id, self.nickname)
add_record('addpeer', self.id, self.nickname)
if 'index' not in self.info:
self.info['index'] = max([
u.info.get('index', -1) for u in User.query.filter_by(peered=True)
@ -182,7 +182,7 @@ class User(db.Model):
state.removepeer[self.id] = True
self.cleanup()
if was_peering:
Changelog.record(state.user(), 'removepeer', self.id)
add_record('removepeer', self.id)
self.save()
def cleanup(self):
@ -210,6 +210,8 @@ class User(db.Model):
self.nickname = nickname
def rebuild_changelog(self):
logger.error('no longer used')
return
Changelog.query.filter_by(user_id=self.id).delete()
for item in self.library.get_items().order_by('created'):
Changelog.record(self, 'additem', item.id, item.info, _commit=False)
@ -300,7 +302,7 @@ class List(db.Model):
state.db.session.commit()
if user_id == settings.USER_ID:
if l.type == 'static' and name != '':
Changelog.record(state.user(), 'addlist', l.name)
add_record('addlist', l.name)
return l
@classmethod
@ -338,7 +340,7 @@ class List(db.Model):
if commit:
state.db.session.commit()
if self.user_id == settings.USER_ID and self.name != '' and available_items:
Changelog.record(self.user, 'addlistitems', self.name, available_items)
add_record('addlistitems', self.name, available_items)
def get_items(self):
from item.models import Item
@ -359,7 +361,7 @@ class List(db.Model):
if commit:
state.db.session.commit()
if self.user_id == settings.USER_ID and self.name != '':
Changelog.record(self.user, 'removelistitems', self.name, items)
add_record('removelistitems', self.name, items)
def remove(self, commit=True):
if not self._query:
@ -367,7 +369,7 @@ class List(db.Model):
state.db.session.execute(q)
if not self._query:
if self.user_id == settings.USER_ID and self.name != '':
Changelog.record(self.user, 'removelist', self.name)
add_record('removelist', self.name)
state.db.session.delete(self)
if commit:
state.db.session.commit()