From e966256fa228d2859f504b26e7d4277bf4ee7114 Mon Sep 17 00:00:00 2001 From: j Date: Sat, 3 Jun 2017 22:50:14 +0200 Subject: [PATCH] simple changelog --- oml/changelog.py | 70 +++++++++++++++++++++++------------- oml/downloads.py | 5 ++- oml/item/models.py | 14 ++++---- oml/item/scan.py | 6 ++-- oml/library.py | 22 ++++++++++-- oml/localnodes.py | 4 +-- oml/node/nodeapi.py | 15 -------- oml/node/server.py | 82 ++++++++++++++++++++++++++++++++++-------- oml/nodes.py | 87 +++++++++++++++++++++++++++++++++++++-------- oml/server.py | 1 + oml/settings.py | 10 +++--- oml/state.py | 7 ++-- oml/update.py | 21 +++++++++++ oml/user/api.py | 10 +++--- oml/user/models.py | 16 +++++---- 15 files changed, 267 insertions(+), 103 deletions(-) diff --git a/oml/changelog.py b/oml/changelog.py index fb595b1..c873f61 100644 --- a/oml/changelog.py +++ b/oml/changelog.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 - +import os from datetime import datetime import json @@ -17,6 +17,49 @@ import state import logging logger = logging.getLogger(__name__) + +def changelog_path(): + return os.path.join(settings.data_path, 'peers', '%s.log' % settings.USER_ID) + +def next_revision(): + settings.server['revision'] = settings.server.get('revision', -1) + 1 + return settings.server['revision'] + +def add_record(action, *args, **kwargs): + if '_ts' in kwargs: + timestamp = kwargs['_ts'] + del kwargs['_ts'] + else: + timestamp = None + if not timestamp: + timestamp = datetime.utcnow() + timestamp = datetime2ts(timestamp) + revision = next_revision() + + data = [revision, timestamp, [action] + list(args)] + data = json.dumps(data, ensure_ascii=False) + + path = changelog_path() + if os.path.exists(path): + mode = 'a' + state.changelog_size = os.path.getsize(path) + else: + mode = 'w' + state.changelog_size = 0 + with open(path, mode) as fd: + fd.write(data + '\n') + state.changelog_size = os.path.getsize(path) + #logger.debug('record change: %s', data) + +def changelog_size(): + if state.changelog_size is None: + path = changelog_path() + if not os.path.exists(path): + return 0 + return os.path.getsize(path) + else: + return state.changelog_size + class Changelog(db.Model): ''' additem itemid metadata from file (info) + OLID @@ -47,29 +90,7 @@ class Changelog(db.Model): @classmethod def record(cls, user, action, *args, **kwargs): - commit = True - if '_commit' in kwargs: - commit = kwargs['_commit'] - del kwargs['_commit'] - if '_ts' in kwargs: - timestamp = kwargs['_ts'] - del kwargs['_ts'] - else: - timestamp = None - c = cls() - c.created = datetime.utcnow() - if not timestamp: - timestamp = c.created - c.timestamp = datetime2ts(timestamp) - c.user_id = user.id - c.revision = cls.query.filter_by(user_id=user.id).count() - c.data = json.dumps([action] + list(args), ensure_ascii=False) - _data = str(c.revision) + str(c.timestamp) + c.data - _data = _data.encode() - state.db.session.add(c) - if commit: - state.db.session.commit() - logger.debug('record change: %s', c.json()) + return add_record(action, *args, **kwargs) @classmethod def apply_changes(cls, user_, changes, first=False): @@ -162,7 +183,6 @@ class Changelog(db.Model): timestamp = self.timestamp or datetime2ts(self.created) return [self.revision, timestamp, json.loads(self.data)] - def action_additem(self, user, timestamp, itemid, info): from item.models import Item i = Item.get(itemid) diff --git a/oml/downloads.py b/oml/downloads.py index 22c9d25..8374f95 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -55,8 +55,11 @@ class Downloads(Thread): del self.transfers[itemid] continue if t.get('added') and t.get('progress', -1) < 1: - if not 'users' in t: + if 'users' not in t: i = item.models.Item.get(itemid) + if not i: + del self.transfers[itemid] + continue t['users'] = [u.id for u in i.users] for uid in t['users']: if state.shutdown: diff --git a/oml/item/models.py b/oml/item/models.py index b207208..e5dcfe4 100644 --- a/oml/item/models.py +++ b/oml/item/models.py @@ -15,7 +15,7 @@ from sqlalchemy.orm import load_only from sqlalchemy.schema import CreateTable import sqlalchemy as sa -from changelog import Changelog +from changelog import add_record from db import MutableDict import json_pickler from .icons import icons @@ -355,7 +355,7 @@ class Item(db.Model): self.update_cover() user = state.user() if record and user in self.users: - Changelog.record(user, 'edititem', self.id, record, _ts=modified) + add_record('edititem', self.id, record, _ts=modified) if 'sharemetadata' in record and not record['sharemetadata']: self.sync_metadata() @@ -419,7 +419,7 @@ class Item(db.Model): f.move() user = state.user() if record and user in self.users: - Changelog.record(user, 'edititem', self.id, record, _ts=self.modified) + add_record('edititem', self.id, record, _ts=self.modified) if 'cover' in record: if state.tasks: state.tasks.queue('getcover', self.id) @@ -553,11 +553,11 @@ class Item(db.Model): if state.downloads and self.id in state.downloads.transfers: del state.downloads.transfers[self.id] self.added = datetime.utcnow() - Changelog.record(u, 'additem', self.id, f.info) - Changelog.record(u, 'edititem', self.id, self.meta) + add_record('additem', self.id, f.info) + add_record('edititem', self.id, self.meta) for l in self.lists.filter_by(user_id=settings.USER_ID): if l.name != '': - Changelog.record(l.user, 'addlistitems', l.name, [self.id]) + add_record('addlistitems', l.name, [self.id]) self.update() f.move() self.update_icons() @@ -601,7 +601,7 @@ class Item(db.Model): if state.downloads: if self.id in state.downloads.transfers: del state.downloads.transfers[self.id] - Changelog.record(user, 'removeitem', self.id) + add_record('removeitem', self.id) class Sort(db.Model): __tablename__ = 'sort' diff --git a/oml/item/scan.py b/oml/item/scan.py index 7cd29c5..632da9c 100644 --- a/oml/item/scan.py +++ b/oml/item/scan.py @@ -10,7 +10,7 @@ import time import ox -from changelog import Changelog +from changelog import add_record from item.models import File, Item from user.models import List from utils import remove_empty_folders @@ -87,8 +87,8 @@ def add_file(id, f, prefix, from_=None, commit=True): item.added = datetime.utcnow() logger.debug('%s load metadata %s', id, path) item.load_metadata() - Changelog.record(user, 'additem', item.id, file.info) - Changelog.record(user, 'edititem', item.id, item.meta) + add_record('additem', item.id, file.info) + add_record('edititem', item.id, item.meta) logger.debug('%s extract icons %s', id, path) item.update_icons() item.modified = datetime.utcnow() diff --git a/oml/library.py b/oml/library.py index 6a8df47..3fdf8dc 100644 --- a/oml/library.py +++ b/oml/library.py @@ -29,6 +29,7 @@ class Peer(object): base = os.path.join(settings.data_path, 'peers') ox.makedirs(base) self._dbpath = os.path.join(base, '%s.db' % id) + self._logpath = os.path.join(base, '%s.log' % id) self._infopath = os.path.join(base, '%s.json' % id) self.id = id @@ -39,11 +40,28 @@ class Peer(object): self.info = json.load(f) else: self.info = {} - if not 'peers' in self.info: + if 'peers' not in self.info: self.info['peers'] = {} - if not 'lists' in self.info: + if 'lists' not in self.info: self.info['lists'] = {} + def apply_log(self): + changes = [] + if os.path.exists(self._logpath): + with open(self._logpath) as fd: + for line in fd: + if line: + try: + data = json.loads(line) + except: + logger.debug('failed to parse line: %s', line) + return + if data[0] <= self.info.get('revision', -1): + continue + changes.append(data) + if changes: + self.apply_changes(changes) + def apply_changes(self, changes): r = True for change in changes: diff --git a/oml/localnodes.py b/oml/localnodes.py index 6bf9233..ae9cbb0 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -126,7 +126,7 @@ class LocalNodes(dict): self.pop(id, None) def on_service_state_change(self, zeroconf, service_type, name, state_change): - if not '[' in name: + if '[' not in name: id = name.split('.')[0] else: id = name.split('[')[1].split(']')[0] @@ -145,7 +145,7 @@ class LocalNodes(dict): key = key.decode() self[id][key] = value.decode() logger.debug('add: %s [%s] (%s:%s)', self[id].get('username', 'anon'), id, self[id]['host'], self[id]['port']) - if state.tasks: + if state.tasks and id in self: state.tasks.queue('addlocalinfo', self[id]) elif state_change is ServiceStateChange.Removed: logger.debug('remove: %s', id) diff --git a/oml/node/nodeapi.py b/oml/node/nodeapi.py index 61569da..4d9bc8b 100644 --- a/oml/node/nodeapi.py +++ b/oml/node/nodeapi.py @@ -2,7 +2,6 @@ # vi:si:et:sw=4:sts=4:ts=4 -from changelog import Changelog from user.models import User from websocket import trigger_event import state @@ -11,20 +10,6 @@ import settings import logging logger = logging.getLogger(__name__) -def api_pullChanges(remote_id, user_id=None, from_=None, to=None): - if user_id and not from_ and not to: - from_ = user_id - user_id = None - if user_id and from_ and not to: - if isinstance(user_id, int): - to = from_ - from_ = user_id - user_id = None - from_ = from_ or 0 - if user_id: - return [] - return Changelog.aggregated_changes(from_) - def api_requestPeering(user_id, username, message): event = 'peering.request' user = User.get_or_create(user_id) diff --git a/oml/node/server.py b/oml/node/server.py index f959a6b..ee7a5b9 100644 --- a/oml/node/server.py +++ b/oml/node/server.py @@ -25,6 +25,7 @@ import db import settings import state import user +from changelog import changelog_size, changelog_path from websocket import trigger_event from . import nodeapi @@ -78,10 +79,6 @@ class NodeServer(ThreadingMixIn, TLSTCPServer): def api_call(action, user_id, args): with db.session(): u = user.models.User.get(user_id) - if u and action in ('pullChanges', ) and not u.peered and u.pending == 'sent': - u.update_peering(True) - state.nodes.queue('add', u.id, True) - trigger_event('peering.accept', u.json()) if action in ( 'requestPeering', 'acceptPeering', 'rejectPeering', 'removePeering', 'cancelPeering' @@ -172,6 +169,8 @@ class Handler(http.server.SimpleHTTPRequestHandler): self.write_with_limit(content, content_length) else: self.write_file_with_limit(path, content_length) + elif len(parts) == 2 and parts[1] == 'log': + self._changelog() else: self.send_response(200, 'OK') self.send_header('Content-type', 'text/plain') @@ -179,6 +178,53 @@ class Handler(http.server.SimpleHTTPRequestHandler): self.end_headers() self.wfile.write('Open Media Library\n'.encode()) + def _denied(self): + self.send_response(403, 'denied') + self.end_headers() + + def _changelog(self): + x509 = self.connection.get_peer_certificate() + user_id = get_service_id(x509.get_pubkey()) if x509 else None + with db.session(): + u = user.models.User.get(user_id) + if not u: + return self._denied() + if u.pending: + logger.debug('ignore request from pending peer[%s] %s (%s)', + user_id, action, args) + return self._denied() + if not u.peered and u.pending == 'sent': + u.update_peering(True) + state.nodes.queue('add', u.id, True) + trigger_event('peering.accept', u.json()) + if not u.peered: + return self._denied() + path = changelog_path() + content_length = changelog_size() + with open(path, 'rb') as log: + request_range = self.headers.get('Range', '') + if request_range: + r = request_range.split('=')[-1].split('-') + start = int(r[0]) + end = int(r[1]) if r[1] else (content_length - 1) + if start == content_length: + content_length = 0 + else: + content_length = end - start + 1 + if content_length < 0: + content_length = os.path.getsize(path) + self.send_response(200, 'OK') + else: + log.seek(start) + self.send_response(206, 'OK') + else: + self.send_response(200, 'OK') + self.send_header('Content-type', 'text/json') + self.send_header('X-Node-Protocol', settings.NODE_PROTOCOL) + self.send_header('Content-Length', str(content_length)) + self.end_headers() + self.write_fd_with_limit(log, content_length) + def gzip_data(self, data): encoding = self.headers.get('Accept-Encoding') if encoding.find('gzip') != -1: @@ -203,7 +249,6 @@ class Handler(http.server.SimpleHTTPRequestHandler): def do_POST(self): ''' API - pullChanges [userid] from [to] requestPeering username message acceptPeering username message rejectPeering message @@ -287,17 +332,26 @@ class Handler(http.server.SimpleHTTPRequestHandler): self.wfile.write(data) position += chunk_size - def write_file_with_limit(self, path, content_length): + def write_fd_with_limit(self, f, content_length): chunk_size = self.chunk_size(content_length) + position = 0 + while True: + data = f.read(chunk_size) + if not data: + break + self.wfile.write(data) + position += chunk_size + if position + chunk_size > content_length: + chunk_size = content_length - position + if chunk_size <= 0: + break + if state.bandwidth: + while not state.bandwidth.upload(chunk_size) and self.server._running: + time.sleep(0.1) + + def write_file_with_limit(self, path, content_length): with open(path, 'rb') as f: - while True: - data = f.read(chunk_size) - if not data: - break - self.wfile.write(data) - if state.bandwidth: - while not state.bandwidth.upload(chunk_size) and self.server._running: - time.sleep(0.1) + self.write_fd_with_limit(f, content_length) class Server(Thread): http_server = None diff --git a/oml/nodes.py b/oml/nodes.py index 5db4912..ab6fc46 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -30,7 +30,7 @@ import library import logging logger = logging.getLogger(__name__) -DEBUG_NODES=False +DEBUG_NODES = False class Node(Thread): host = None @@ -239,24 +239,81 @@ class Node(Thread): self.online = self.can_connect() if not self.online or state.shutdown: return - with db.session(): - u = user.models.User.get_or_create(self.user_id) - if not u or not self.online or not u.peered: - return True + self.resolve() peer = get_peer(self.user_id) - from_revision = peer.info.get('revision', -1) + 1 + path = peer._logpath + if os.path.exists(path): + size = os.path.getsize(path) + else: + size = 0 + url = '%s/log' % self.url + if DEBUG_NODES: + logger.debug('pullChanges: %s [%s]', self.user_id, url) + headers = self.headers.copy() + if size: + headers['Range'] = '%s-' % size + self._opener.addheaders = list(zip(headers.keys(), headers.values())) try: - changes = self.request('pullChanges', from_revision) + r = self._opener.open(url, timeout=self.TIMEOUT*60) + except urllib.error.HTTPError as e: + if e.code == 403: + logger.debug('pullChanges 403: %s (%s)', url, self.user_id) + if state.tasks: + state.tasks.queue('peering', (self.user_id, False)) + del self._nodes[self.user_id] + self.online = False + else: + logger.debug('unknown http errpr %s %s (%s)', e.code, url, self.user_id) + return False + except socket.timeout: + logger.debug('timeout %s', url) + return False + except socks.GeneralProxyError: + logger.debug('openurl failed %s', url) + return False + except urllib.error.URLError as e: + logger.debug('openurl failed urllib2.URLError %s', e.reason) + return False except: - self.online = False - if DEBUG_NODES: - logger.debug('%s went offline', u.name, exc_info=True) + logger.debug('openurl failed %s', url, exc_info=True) return False - if not changes: + if r.getcode() in (200, 206): + changed = False + chunk_size = 16 * 1024 + mode = 'ab' if r.getcode() == 206 else 'wb' + content = b'' + + try: + if r.headers.get('content-encoding', None) == 'gzip': + fileobj = gzip.GzipFile(fileobj=r) + else: + fileobj = r + for chunk in iter(lambda: fileobj.read(chunk_size), b''): + content += chunk + eol = content.rfind(b'\n') + 1 + if eol > 0: + with open(path, mode) as fd: + fd.write(content[:eol]) + content = content[eol:] + mode = 'ab' + changed = True + if state.shutdown: + return False + if state.bandwidth: + while not state.bandwidth.download(chunk_size) and not state.shutdown: + time.sleep(0.1) + if content: + with open(path, mode) as fd: + fd.write(content) + changed = True + if changed: + peer.apply_log() + except: + logger.debug('download failed %s', url, exc_info=True) + return False + else: + logger.debug('FAILED %s', url) return False - #with open('/tmp/changelog_%s_%s.json' % (self.user_id, from_revision), 'w') as f: - # json.dump(changes, f, ensure_ascii=False, indent=4) - return peer.apply_changes(changes) def peering(self, action): with db.session(): @@ -417,7 +474,7 @@ class Nodes(Thread): del u.info['local'] u.save() self.queue('add', u.id) - state.peers[u.id] = library.Peer(u.id) + get_peer(u.id) for u in user.models.User.query.filter_by(queued=True): logger.debug('adding queued node... %s', u.id) self.queue('add', u.id, True) diff --git a/oml/server.py b/oml/server.py index 51d909c..89401d0 100644 --- a/oml/server.py +++ b/oml/server.py @@ -156,6 +156,7 @@ def run(): state.tor = tor.Tor() state.node = node.server.start() state.nodes = nodes.Nodes() + def publish(): if not state.tor.is_online(): state.main.call_later(10, publish) diff --git a/oml/settings.py b/oml/settings.py index d2cb3b1..a750eff 100644 --- a/oml/settings.py +++ b/oml/settings.py @@ -58,8 +58,8 @@ release = pdict(os.path.join(data_path, 'release.json')) USER_ID = get_user_id(ssl_key_path, ssl_cert_path) -OML_UPDATE_KEY='K55EZpPYbP3X+3mA66cztlw1sSaUMqGwfTDKQyP2qOU' -OML_UPDATE_CERT='''-----BEGIN CERTIFICATE----- +OML_UPDATE_KEY = 'K55EZpPYbP3X+3mA66cztlw1sSaUMqGwfTDKQyP2qOU' +OML_UPDATE_CERT = '''-----BEGIN CERTIFICATE----- MIICgzCCAeygAwIBAgIBATANBgkqhkiG9w0BAQsFADAbMRkwFwYDVQQDDBBqc2pt Z2J4ZjJvZGN6NGNxMB4XDTE1MTEyMzIwMzY1NFoXDTE1MTEyNDIwMzY1NFowGzEZ MBcGA1UEAwwQanNqbWdieGYyb2RjejRjcTCBnzANBgkqhkiG9w0BAQEFAAOBjQAw @@ -81,11 +81,11 @@ if 'modules' in release and 'openmedialibrary' in release['modules']: else: MINOR_VERSION = 'git' -NODE_PROTOCOL="0.7" -VERSION="%s.%s" % (NODE_PROTOCOL, MINOR_VERSION) +NODE_PROTOCOL = "0.8" +VERSION = "%s.%s" % (NODE_PROTOCOL, MINOR_VERSION) USER_AGENT = 'OpenMediaLibrary/%s' % VERSION DEBUG_HTTP = server.get('debug_http', False) -DB_VERSION = 12 +DB_VERSION = 13 diff --git a/oml/state.py b/oml/state.py index 2f296e6..7f63690 100644 --- a/oml/state.py +++ b/oml/state.py @@ -1,3 +1,5 @@ +from threading import local + bandwidth = None host = None main = None @@ -12,13 +14,14 @@ shutdown = False websockets = [] peers = {} +changelog_size = None + activity = {} removepeer = {} +db = local() def user(): import settings import user.models return user.models.User.get_or_create(settings.USER_ID) -from threading import local -db = local() diff --git a/oml/update.py b/oml/update.py index 29459b9..c265f63 100644 --- a/oml/update.py +++ b/oml/update.py @@ -365,6 +365,8 @@ class Update(Thread): db_version = migrate_11() if db_version < 12: db_version = migrate_12() + if db_version < 13: + db_version = migrate_13() settings.server['db_version'] = settings.DB_VERSION def run(self): @@ -589,3 +591,22 @@ def migrate_12(): 'DROP TABLE IF EXISTS transfer' ]) return 12 + +def migrate_13(): + import settings + import changelog + import os + import json + path = os.path.join(settings.data_path, 'peers', '%s.log' % settings.USER_ID) + if not os.path.exists(path): + with db.session() as session: + revision = -1 + qs = changelog.Changelog.query.filter_by(user_id=settings.USER_ID) + with open(path, 'w') as fd: + for c in qs.order_by('timestamp'): + data = json.dumps([c.revision, c.timestamp, json.loads(c.data)], ensure_ascii=False) + fd.write(data + '\n') + revision = c.revision + if revision > -1: + settings.server['revision'] = revision + return 13 diff --git a/oml/user/api.py b/oml/user/api.py index f70737d..242b5fa 100644 --- a/oml/user/api.py +++ b/oml/user/api.py @@ -8,7 +8,7 @@ import os import ox -from changelog import Changelog +from changelog import add_record from oxtornado import actions from utils import update_dict, user_sort_key from . import models @@ -79,9 +79,9 @@ def setPreferences(data): u.update_name() u.save() if change_username: - Changelog.record(u, 'editusername', data['username']) + add_record('editusername', data['username']) if change_contact: - Changelog.record(state.user(), 'editcontact', data['contact']) + add_record('editcontact', data['contact']) if change_path: state.tasks.queue('changelibrarypath', change_path) if change_autostart: @@ -227,7 +227,7 @@ def editList(data): validate_query(data['query']) l._query = data['query'] if l.type == 'static' and name != l.name: - Changelog.record(state.user(), 'editlist', name, {'name': l.name}) + add_record('editlist', name, {'name': l.name}) l.save() return l.json() actions.register(editList, cache=False) @@ -303,7 +303,7 @@ def sortLists(data): state.db.session.add(l) state.db.session.commit() if lists: - Changelog.record(state.user(), 'orderlists', lists) + add_record('orderlists', lists) return {} actions.register(sortLists, cache=False) diff --git a/oml/user/models.py b/oml/user/models.py index 03d78f1..22d76e9 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -9,7 +9,7 @@ import ox from sqlalchemy.orm import load_only import sqlalchemy as sa -from changelog import Changelog +from changelog import add_record from db import MutableDict import db import json_pickler @@ -159,7 +159,7 @@ class User(db.Model): self.peered = False self.save() if not was_peering: - Changelog.record(state.user(), 'addpeer', self.id, self.nickname) + add_record('addpeer', self.id, self.nickname) if 'index' not in self.info: self.info['index'] = max([ u.info.get('index', -1) for u in User.query.filter_by(peered=True) @@ -182,7 +182,7 @@ class User(db.Model): state.removepeer[self.id] = True self.cleanup() if was_peering: - Changelog.record(state.user(), 'removepeer', self.id) + add_record('removepeer', self.id) self.save() def cleanup(self): @@ -210,6 +210,8 @@ class User(db.Model): self.nickname = nickname def rebuild_changelog(self): + logger.error('no longer used') + return Changelog.query.filter_by(user_id=self.id).delete() for item in self.library.get_items().order_by('created'): Changelog.record(self, 'additem', item.id, item.info, _commit=False) @@ -300,7 +302,7 @@ class List(db.Model): state.db.session.commit() if user_id == settings.USER_ID: if l.type == 'static' and name != '': - Changelog.record(state.user(), 'addlist', l.name) + add_record('addlist', l.name) return l @classmethod @@ -338,7 +340,7 @@ class List(db.Model): if commit: state.db.session.commit() if self.user_id == settings.USER_ID and self.name != '' and available_items: - Changelog.record(self.user, 'addlistitems', self.name, available_items) + add_record('addlistitems', self.name, available_items) def get_items(self): from item.models import Item @@ -359,7 +361,7 @@ class List(db.Model): if commit: state.db.session.commit() if self.user_id == settings.USER_ID and self.name != '': - Changelog.record(self.user, 'removelistitems', self.name, items) + add_record('removelistitems', self.name, items) def remove(self, commit=True): if not self._query: @@ -367,7 +369,7 @@ class List(db.Model): state.db.session.execute(q) if not self._query: if self.user_id == settings.USER_ID and self.name != '': - Changelog.record(self.user, 'removelist', self.name) + add_record('removelist', self.name) state.db.session.delete(self) if commit: state.db.session.commit()