one tasks queue instead of one per websocket
This commit is contained in:
parent
e210d9329c
commit
5b33721c87
6 changed files with 71 additions and 51 deletions
|
@ -241,7 +241,7 @@ actions.register(cancelDownloads, cache=False)
|
||||||
|
|
||||||
|
|
||||||
def scan(data):
|
def scan(data):
|
||||||
state.main.add_callback(state.websockets[0].put, json.dumps(['scan', {}]))
|
state.tasks.queue('scan', {})
|
||||||
return {}
|
return {}
|
||||||
actions.register(scan, cache=False)
|
actions.register(scan, cache=False)
|
||||||
|
|
||||||
|
@ -255,7 +255,7 @@ def _import(data):
|
||||||
}
|
}
|
||||||
'''
|
'''
|
||||||
logger.debug('api.import %s', data)
|
logger.debug('api.import %s', data)
|
||||||
state.main.add_callback(state.websockets[0].put, json.dumps(['import', data]))
|
state.tasks.queue('import', data)
|
||||||
return {}
|
return {}
|
||||||
actions.register(_import, 'import', cache=False)
|
actions.register(_import, 'import', cache=False)
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,8 @@ def run():
|
||||||
import user
|
import user
|
||||||
import downloads
|
import downloads
|
||||||
import nodes
|
import nodes
|
||||||
|
import tasks
|
||||||
|
state.tasks = tasks.Tasks(app)
|
||||||
state.node = node.server.start(app)
|
state.node = node.server.start(app)
|
||||||
state.nodes = nodes.Nodes(app)
|
state.nodes = nodes.Nodes(app)
|
||||||
state.downloads = downloads.Downloads(app)
|
state.downloads = downloads.Downloads(app)
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
websockets = []
|
websockets = []
|
||||||
nodes = False
|
nodes = False
|
||||||
|
tasks = False
|
||||||
main = None
|
main = None
|
||||||
online = False
|
online = False
|
||||||
host = None
|
host = None
|
||||||
|
|
51
oml/tasks.py
Normal file
51
oml/tasks.py
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# vi:si:et:sw=4:sts=4:ts=4
|
||||||
|
from __future__ import division
|
||||||
|
|
||||||
|
from Queue import Queue
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
from websocket import trigger_event
|
||||||
|
|
||||||
|
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger('oml.websocket')
|
||||||
|
|
||||||
|
class Tasks(Thread):
|
||||||
|
|
||||||
|
def __init__(self, app):
|
||||||
|
self.q = Queue()
|
||||||
|
self.connected = True
|
||||||
|
self._app = app
|
||||||
|
Thread.__init__(self)
|
||||||
|
self.daemon = True
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
import item.scan
|
||||||
|
while self.connected:
|
||||||
|
m = self.q.get()
|
||||||
|
if m:
|
||||||
|
print m
|
||||||
|
action, data = m
|
||||||
|
if action == 'ping':
|
||||||
|
trigger_event('pong', data)
|
||||||
|
elif action == 'import':
|
||||||
|
item.scan.run_import(data)
|
||||||
|
elif action == 'scan':
|
||||||
|
item.scan.run_scan()
|
||||||
|
elif action == 'update':
|
||||||
|
trigger_event('error', {'error': 'not implemented'})
|
||||||
|
else:
|
||||||
|
trigger_event('error', {'error': 'unknown action'})
|
||||||
|
self.q.task_done()
|
||||||
|
|
||||||
|
def join(self):
|
||||||
|
self.connected = False
|
||||||
|
self.put(None)
|
||||||
|
self.q.join()
|
||||||
|
return Thread.join(self)
|
||||||
|
|
||||||
|
def queue(self, action, data):
|
||||||
|
self.q.put((action, data))
|
||||||
|
|
|
@ -43,6 +43,7 @@ class User(db.Model):
|
||||||
if not user:
|
if not user:
|
||||||
user = cls(id=id, peered=False, online=False)
|
user = cls(id=id, peered=False, online=False)
|
||||||
user.info = {}
|
user.info = {}
|
||||||
|
user.update_name()
|
||||||
user.save()
|
user.save()
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
|
@ -4,9 +4,7 @@ from __future__ import division
|
||||||
|
|
||||||
from tornado.websocket import WebSocketHandler
|
from tornado.websocket import WebSocketHandler
|
||||||
from tornado.ioloop import IOLoop
|
from tornado.ioloop import IOLoop
|
||||||
from Queue import Queue
|
|
||||||
import json
|
import json
|
||||||
from threading import Thread
|
|
||||||
|
|
||||||
from oxtornado import json_dumps
|
from oxtornado import json_dumps
|
||||||
|
|
||||||
|
@ -15,70 +13,37 @@ import state
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger('oml.websocket')
|
logger = logging.getLogger('oml.websocket')
|
||||||
|
|
||||||
class Background:
|
|
||||||
|
|
||||||
def __init__(self, handler):
|
|
||||||
self.handler = handler
|
|
||||||
self.q = Queue()
|
|
||||||
self.connected = True
|
|
||||||
self.main = IOLoop.instance()
|
|
||||||
|
|
||||||
def worker(self):
|
|
||||||
while self.connected:
|
|
||||||
message = self.q.get()
|
|
||||||
action, data = json.loads(message)
|
|
||||||
import item.scan
|
|
||||||
if action == 'ping':
|
|
||||||
self.post(['pong', data])
|
|
||||||
elif action == 'import':
|
|
||||||
item.scan.run_import(data)
|
|
||||||
elif action == 'scan':
|
|
||||||
item.scan.run_scan()
|
|
||||||
elif action == 'update':
|
|
||||||
self.post(['error', {'error': 'not implemented'}])
|
|
||||||
else:
|
|
||||||
self.post(['error', {'error': 'unknown action'}])
|
|
||||||
self.q.task_done()
|
|
||||||
|
|
||||||
def join(self):
|
|
||||||
self.q.join()
|
|
||||||
|
|
||||||
def put(self, data):
|
|
||||||
self.q.put(data)
|
|
||||||
|
|
||||||
def post(self, data):
|
|
||||||
if not isinstance(data, basestring):
|
|
||||||
data = json_dumps(data)
|
|
||||||
self.main.add_callback(lambda: self.handler.write_message(data))
|
|
||||||
|
|
||||||
|
|
||||||
class Handler(WebSocketHandler):
|
class Handler(WebSocketHandler):
|
||||||
background = None
|
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
if self.request.host not in self.request.headers['origin']:
|
if self.request.host not in self.request.headers['origin']:
|
||||||
logger.debug('reject cross site attempt to open websocket %s', self.request)
|
logger.debug('reject cross site attempt to open websocket %s', self.request)
|
||||||
self.close()
|
self.close()
|
||||||
self.background = Background(self)
|
if self not in state.websockets:
|
||||||
state.websockets.append(self.background)
|
state.websockets.append(self)
|
||||||
self.t = Thread(target=self.background.worker)
|
|
||||||
self.t.daemon = True
|
|
||||||
self.t.start()
|
|
||||||
|
|
||||||
#websocket calls
|
#websocket calls
|
||||||
def on_message(self, message):
|
def on_message(self, message):
|
||||||
self.background.put(message)
|
action, data = json.load(message)
|
||||||
|
if state.tasks:
|
||||||
|
state.tasks.queue(action, data)
|
||||||
|
|
||||||
def on_close(self):
|
def on_close(self):
|
||||||
if self.background:
|
if self in state.websockets:
|
||||||
state.websockets.remove(self.background)
|
state.websockets.remove(self)
|
||||||
self.background.connected = False
|
|
||||||
|
def post(self, event, data):
|
||||||
|
message = json_dumps([event, data])
|
||||||
|
main = IOLoop.instance()
|
||||||
|
main.add_callback(lambda: self.write_message(message))
|
||||||
|
|
||||||
def trigger_event(event, data):
|
def trigger_event(event, data):
|
||||||
if len(state.websockets):
|
if len(state.websockets):
|
||||||
logger.debug('trigger event %s %s %s', event, data, len(state.websockets))
|
logger.debug('trigger event %s %s %s', event, data, len(state.websockets))
|
||||||
for ws in state.websockets:
|
for ws in state.websockets:
|
||||||
try:
|
try:
|
||||||
ws.post([event, data])
|
ws.post(event, data)
|
||||||
except:
|
except:
|
||||||
logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)
|
logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)
|
||||||
|
|
Loading…
Reference in a new issue