add inital implementation for a websocket, disabled by default for now

This commit is contained in:
j 2015-04-28 23:05:15 +05:30
commit e7f83f674e
15 changed files with 191 additions and 2 deletions

View file

@ -9,6 +9,7 @@ from django.db import models
from ox.django import fields
import ox
import websocket
import managers
'''
@ -34,6 +35,7 @@ def add_changelog(request, data, id=None):
c.changeid = id or data.get('id')
c.created = datetime.now()
c.save()
websocket.trigger_event('change', {'action': c.action, 'id': c.changeid})
class Log(models.Model):

View file

@ -69,6 +69,9 @@ STATICFILES_FINDERS = (
GEOIP_PATH = normpath(join(PROJECT_ROOT, '..', 'data', 'geo'))
WEBSOCKET = False
WEBSOCKET_PORT = 2622
WEBSOCKET_ADDRESS = '127.0.0.1'
# List of callables that know how to import templates from various sources.
TEMPLATE_LOADERS = (
@ -129,6 +132,7 @@ INSTALLED_APPS = (
'tv',
'document',
'entity',
'websocket'
)
# Log errors into db

View file

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from celery.execute import send_task
from django.conf import settings
key = 'websocket'
def trigger_event(event, data):
if settings.WEBSOCKET:
send_task('trigger_event', [event, data], exchange=key, routing_key=key)

View file

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
import json
from threading import Thread
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.web import Application
from tornado.websocket import WebSocketHandler
import logging
logger = logging.getLogger('pandora.websocket')
sockets = []
class Daemon(Thread):
def __init__(self, port, address):
self.port = port
self.address = address
Thread.__init__(self)
self.daemon = True
self.start()
def join(self):
self.main.stop()
def run(self):
options = {
'debug': False,
'gzip': False
}
handlers = [
(r'/(.*)', Handler),
]
self.http_server = HTTPServer(Application(handlers, **options))
self.main = IOLoop.instance()
self.http_server.listen(self.port, self.address)
self.main.start()
class Handler(WebSocketHandler):
'''
def check_origin(self, origin):
# bypass same origin check
return True
'''
def open(self, path):
if self not in sockets:
sockets.append(self)
#websocket calls
def on_close(self):
if self in sockets:
sockets.remove(self)
def on_message(self, message):
pass
#logger.debug('got message %s', message)
def post(self, event, data):
message = json.dumps([event, data])
main = IOLoop.instance()
main.add_callback(lambda: self.write_message(message))
def trigger_event(event, data):
logger.debug('trigger event %s %s to %s clients', event, data, len(sockets))
main = IOLoop.instance()
message = json.dumps([event, data])
for ws in sockets:
try:
main.add_callback(lambda: ws.write_message(message))
except:
logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)

View file

View file

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import absolute_import
import os
from optparse import make_option
from django.core.management.base import BaseCommand
from django.conf import settings
from ... import daemon, worker
import logging
class Command(BaseCommand):
"""
"""
help = 'run websocket daemon'
args = ''
option_list = BaseCommand.option_list + (
make_option('--debug',
action='store_true',
dest='debug',
default=False,
help='enable debug'),
make_option("--pidfile", dest="pidfile",metavar="PIDFILE"),
)
def handle(self, **options):
socket = daemon.Daemon(settings.WEBSOCKET_PORT, settings.WEBSOCKET_ADDRESS)
if options['debug']:
logging.basicConfig(level=logging.DEBUG)
if options['pidfile']:
with open(options['pidfile'], 'w') as pid:
pid.write('%s' % os.getpid())
worker.run()
socket.join()

View file

View file

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import absolute_import
from django.conf import settings
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
from . import daemon, key
queue = Queue('websocket', Exchange(key, type='direct'), routing_key=key)
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=queue,
accept=['pickle', 'json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
if body['task'] == 'trigger_event':
daemon.trigger_event(*body['args'])
message.ack()
def run():
with Connection(settings.BROKER_URL) as conn:
try:
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('shutting down...')