diff --git a/oml/item/api.py b/oml/item/api.py index ca671e9..b19fed3 100644 --- a/oml/item/api.py +++ b/oml/item/api.py @@ -241,7 +241,7 @@ actions.register(cancelDownloads, cache=False) def scan(data): - state.main.add_callback(state.websockets[0].put, json.dumps(['scan', {}])) + state.tasks.queue('scan', {}) return {} actions.register(scan, cache=False) @@ -255,7 +255,7 @@ def _import(data): } ''' logger.debug('api.import %s', data) - state.main.add_callback(state.websockets[0].put, json.dumps(['import', data])) + state.tasks.queue('import', data) return {} actions.register(_import, 'import', cache=False) diff --git a/oml/server.py b/oml/server.py index f4efad2..bf37f03 100644 --- a/oml/server.py +++ b/oml/server.py @@ -64,6 +64,8 @@ def run(): import user import downloads import nodes + import tasks + state.tasks = tasks.Tasks(app) state.node = node.server.start(app) state.nodes = nodes.Nodes(app) state.downloads = downloads.Downloads(app) diff --git a/oml/state.py b/oml/state.py index 4f1e96b..fcb54bd 100644 --- a/oml/state.py +++ b/oml/state.py @@ -1,5 +1,6 @@ websockets = [] nodes = False +tasks = False main = None online = False host = None diff --git a/oml/tasks.py b/oml/tasks.py new file mode 100644 index 0000000..660122b --- /dev/null +++ b/oml/tasks.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division + +from Queue import Queue +from threading import Thread + +from websocket import trigger_event + + +import logging +logger = logging.getLogger('oml.websocket') + +class Tasks(Thread): + + def __init__(self, app): + self.q = Queue() + self.connected = True + self._app = app + Thread.__init__(self) + self.daemon = True + self.start() + + def run(self): + import item.scan + while self.connected: + m = self.q.get() + if m: + print m + action, data = m + if action == 'ping': + trigger_event('pong', data) + elif action == 'import': + item.scan.run_import(data) + elif action == 'scan': + item.scan.run_scan() + elif action == 'update': + trigger_event('error', {'error': 'not implemented'}) + else: + trigger_event('error', {'error': 'unknown action'}) + self.q.task_done() + + def join(self): + self.connected = False + self.put(None) + self.q.join() + return Thread.join(self) + + def queue(self, action, data): + self.q.put((action, data)) + diff --git a/oml/user/models.py b/oml/user/models.py index 70bb42d..1373592 100644 --- a/oml/user/models.py +++ b/oml/user/models.py @@ -43,6 +43,7 @@ class User(db.Model): if not user: user = cls(id=id, peered=False, online=False) user.info = {} + user.update_name() user.save() return user diff --git a/oml/websocket.py b/oml/websocket.py index 31ed993..94c0b94 100644 --- a/oml/websocket.py +++ b/oml/websocket.py @@ -4,9 +4,7 @@ from __future__ import division from tornado.websocket import WebSocketHandler from tornado.ioloop import IOLoop -from Queue import Queue import json -from threading import Thread from oxtornado import json_dumps @@ -15,70 +13,37 @@ import state import logging logger = logging.getLogger('oml.websocket') -class Background: - - def __init__(self, handler): - self.handler = handler - self.q = Queue() - self.connected = True - self.main = IOLoop.instance() - - def worker(self): - while self.connected: - message = self.q.get() - action, data = json.loads(message) - import item.scan - if action == 'ping': - self.post(['pong', data]) - elif action == 'import': - item.scan.run_import(data) - elif action == 'scan': - item.scan.run_scan() - elif action == 'update': - self.post(['error', {'error': 'not implemented'}]) - else: - self.post(['error', {'error': 'unknown action'}]) - self.q.task_done() - - def join(self): - self.q.join() - - def put(self, data): - self.q.put(data) - - def post(self, data): - if not isinstance(data, basestring): - data = json_dumps(data) - self.main.add_callback(lambda: self.handler.write_message(data)) - class Handler(WebSocketHandler): - background = None def open(self): if self.request.host not in self.request.headers['origin']: logger.debug('reject cross site attempt to open websocket %s', self.request) self.close() - self.background = Background(self) - state.websockets.append(self.background) - self.t = Thread(target=self.background.worker) - self.t.daemon = True - self.t.start() + if self not in state.websockets: + state.websockets.append(self) + #websocket calls def on_message(self, message): - self.background.put(message) + action, data = json.load(message) + if state.tasks: + state.tasks.queue(action, data) def on_close(self): - if self.background: - state.websockets.remove(self.background) - self.background.connected = False + if self in state.websockets: + state.websockets.remove(self) + + def post(self, event, data): + message = json_dumps([event, data]) + main = IOLoop.instance() + main.add_callback(lambda: self.write_message(message)) def trigger_event(event, data): if len(state.websockets): logger.debug('trigger event %s %s %s', event, data, len(state.websockets)) for ws in state.websockets: try: - ws.post([event, data]) + ws.post(event, data) except: logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)