diff --git a/example/link.py b/example/link.py new file mode 100644 index 0000000..38337c5 --- /dev/null +++ b/example/link.py @@ -0,0 +1,38 @@ +import json +import urllib2 + +PEERLINK='http://[::1]:8842/' +NAME = 'chat' + +def urlopen(url, data=None, headers=None): + if data and not isinstance(data, str): + data = json.dumps(data) + if not headers: + headers = { + 'Content-Type': 'application/json', + 'User-Agent': 'ChatServer/0.0' + } + opener = urllib2.build_opener() + print 'urlopen', url, data + req = urllib2.Request(url, data=data, headers=headers) + response = opener.open(req) + return response.read() + +def post(action, data=None): + url = PEERLINK + action + return json.loads(urlopen(url, data)) + +def add(name, url): + global NAME + NAME = name + return post('add', {'name': name, 'url': url}) + +def remote(peer, action, data): + url = PEERLINK + '%s/%s/%s' % (peer, NAME, action) + if not data: + data = {'test': True} + print 'REMOTE', url + return urlopen(url, data) + +def remote_json(peer, action, data): + return json.loads(remote(peer, action, data)) diff --git a/example/server.py b/example/server.py old mode 100644 new mode 100755 index 794411e..ac3ec43 --- a/example/server.py +++ b/example/server.py @@ -1,104 +1,140 @@ -#!/usr/bin/python +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division, print_function import json import os -import shutil -import SimpleHTTPServer -import socket -import SocketServer +import signal import sys -import urllib2 +import mimetypes -peers = {} +from tornado.httpserver import HTTPServer +from tornado.ioloop import IOLoop, PeriodicCallback +from tornado.web import Application +import tornado -NETBASE='http://[::1]:8842/' -NAME = 'chat' +import websocket +from websocket import trigger_event +import state +from tasks import Tasks +from utils import json_dumps +import link -def remote(peer, action, data): - url = NETBASE + '%s/%s/%s' % (peer, NAME, action) - if data and not isinstance(data, str): - data = json.dumps(data) - opener = urllib2.build_opener() - req = urllib2.Request(url, data=data, headers={ - 'Content-Type': 'application/json', - 'User-Agent': 'ChatServer/0.0' - }) - response = opener.open(req) - return response.read() +import logging +logger = logging.getLogger('server') +root_dir = os.path.normpath(os.path.abspath(os.path.dirname(__file__))) +STATIC_PATH = os.path.join(root_dir, 'static') -def add_service(name, url): - add = NETBASE + 'add' - urllib2.urlopen(add, json.dumps({'name': name, 'url': url})) +class BaseHandler(tornado.web.RequestHandler): -class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler): - - def do_GET(self): - print 'GET', self.path - path = os.path.join('static', self.path[1:] if self.path != '/' else 'index.html') + def serve_static(self, path, mimetype=None, include_body=True): + if not mimetype: + mimetype = mimetypes.guess_type(path)[0] + logging.debug('serve %s', path) if os.path.exists(path): - with open(path) as fd: - shutil.copyfileobj(fd, self.wfile) - - def _remote_request(self, action, data): - response = {} - if action == 'message': - pass - elif action == 'ping': - print self.headers - response['userid'] = self.headers.getheader('From') - response['remote ping'] = True - response['data'] = data - return response - - def _request(self, action, data): - response = {} - if action == 'test': - response['test'] = 'ok' - response['data'] = data - elif action in ('ping', 'pong'): - id = data['id'] - del data['id'] - response = remote(id, action, data) - return response - - def do_POST(self): - print 'POST', self.path - length = int(self.headers.getheader('content-length')) - body = self.rfile.read(length) - data = json.loads(body) - if self.path.startswith('/remote'): - action = self.path.split('/')[2] - response = self._remote_request(action, data) + self.set_header('Content-Type', mimetype) + self.set_header('Content-Length', str(os.stat(path).st_size)) + if include_body: + with open(path) as fd: + self.write(fd.read()) else: - action = self.path.split('/')[1] - response = self._request(action, data) + self.set_status(404) + return - response = json.dumps(response, indent=2) - self.send_response(200) - self.send_header('Content-Type', 'application/json') - self.send_header('Content-Length', str(len(response))) - self.end_headers() - self.wfile.write(response) + def render_json(self, response): + response = json_dumps(response) + self.set_header('Content-Type', 'application/json') + self.set_header('Content-Length', str(len(response))) + self.write(response) + self.finish() -class Server(SocketServer.ThreadingMixIn, SocketServer.TCPServer): - ''' - IPv4/IPv6 Dual Stack - ''' - address_family = socket.AF_INET6 - allow_reuse_address = True +class RemoteHandler(BaseHandler): - def server_bind(self): - self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - SocketServer.TCPServer.server_bind(self) + def post(self, action): + data = json.loads(self.request.body) + response = {} + print ('GOT remote request', action, data) + if action == 'info': + response = state.info + elif action == 'message': + data['from'] = self.request.headers['From'] + trigger_event(action, data) + else: + response = {'error': 'unknown action'} + return self.render_json(response) + +class MainHandler(BaseHandler): + + def get(self, path): + path = path[1:] + if not path: + path = 'index.html' + path = os.path.join(STATIC_PATH, path) + self.serve_static(path) + + def post(self, path): + action = path.split('/')[1] + data = json.loads(self.request.body) + response = {} + if action in ('info', 'ping'): + if 'id' in data: + id = data['id'] + del data['id'] + response = link.remote_json(id, action, data) + return self.render_json(response) if __name__ == '__main__': - if len(sys.argv) == 2: + address = '' + port = 8000 + + if len(sys.argv) > 1: port = int(sys.argv[1]) + link.PEERLINK=sys.argv[2] + + logging.basicConfig(level=logging.DEBUG) + + options = { + 'debug': True, + } + handlers = [ + (r'/ws', websocket.Handler), + (r"/remote/(.*)", RemoteHandler), + (r"(.*)", MainHandler), + ] + + http_server = HTTPServer(Application(handlers, **options)) + + http_server.listen(port, address) + + state.tasks = Tasks() + state.main = IOLoop.instance() + + state._status = PeriodicCallback(lambda: state.info.update(link.post('info')), 60000) + state._status.start() + + if ':' in address: + host = '[%s]' % address + elif not address: + host = '[::1]' else: - port = 8000 - print "listening on port", port - url = 'http://127.0.0.1:%s/remote/' % port - add_service(NAME, url) - httpd = Server(("", port), Handler) - httpd.serve_forever() + host = address + url = 'http://%s:%s/' % (host, port) + + link.add('chat', url + 'remote/') + state.info = link.post('info') + + print('listening at %s' % url) + + def shutdown(): + state.tasks.join() + http_server.stop() + + signal.signal(signal.SIGTERM, shutdown) + + try: + state.main.start() + except: + print('shutting down...') + shutdown() diff --git a/example/state.py b/example/state.py new file mode 100644 index 0000000..66e672c --- /dev/null +++ b/example/state.py @@ -0,0 +1,4 @@ +websockets = [] +tasks = None +info = {} +peers = [] diff --git a/example/static/chat.js b/example/static/chat.js new file mode 100644 index 0000000..411af0a --- /dev/null +++ b/example/static/chat.js @@ -0,0 +1,90 @@ +var names = {}; +Ox.load(function() { + var app = window.app = {}; + app.triggerEvent = function(action, data) { + if (action == 'info') { + Ox.$('#id').html(data.id); + Ox.$('#nick').val(data.nick || ''); + var peers = Ox.$('#peers'); + peers.empty() + data.local.forEach(function(peer) { + console.log('add', peer); + if (names[peer]) { + Ox.$('
') + .html(peer + '(' + names[peer] +')') + .appendTo(peers); + } else { + app.getNick(peer, function(nick) { + var html = peer; + if(nick) { + html = peer + '(' + nick +')'; + } + Ox.$('
') + .html(html) + .appendTo(peers); + }); + } + }); + } else if (action == 'message') { + var html = (names[data.from] || data.from) + ': ' + data.message; + Ox.$('
').html(html).appendTo(Ox.$('#messages')); + } + Ox.print(action, data); + + }; + app.socket = new WebSocket('ws://' + document.location.host + '/ws'); + app.socket.onopen = function(event) { + app.triggerEvent('open', event); + }; + app.socket.onmessage = function(event) { + var data = JSON.parse(event.data); + app.triggerEvent(data[0], data[1]); + }; + app.socket.onerror = function(event) { + app.triggerEvent('error', event); + app.socket.close(); + }; + app.socket.onclose = function(event) { + app.triggerEvent('close', event); + setTimeout(connectSocket, 1000); + }; + app.request = function(action, data, callback) { + var xhr = new XMLHttpRequest(); + xhr.onload = function() { + try { + var response = JSON.parse(this.response); + callback && callback(response); + } catch(e) { + Ox.print('FAIL', this.response); + } + } + xhr.open('POST', '/' + action); + data = JSON.stringify(data); + xhr.send(data); + }; + app.api = function(action, data) { + console.log('api', action, data); + app.socket.send(JSON.stringify([action, data])); + }; + Ox.$('#send').on({ + click: function() { + var value = Ox.$('#message').val(); + if (value) { + app.api('sendMessage', { + message: value + }); + } + } + }) + app.getNick = function(peer, callback) { + app.request('info', {id: peer}, function(response) { + names[peer] = response.nick; + callback(response.nick); + }); + } + Ox.$('#nick').on({ + change: function() { + app.api('nick', this.value); + } + }); +}); diff --git a/example/static/index.html b/example/static/index.html index b063a6c..3b13a1b 100644 --- a/example/static/index.html +++ b/example/static/index.html @@ -4,29 +4,21 @@ - - + + +Nick: ()
+Peers: +
+
+ +
+Message:
+
diff --git a/example/tasks.py b/example/tasks.py new file mode 100644 index 0000000..377735c --- /dev/null +++ b/example/tasks.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 + +from __future__ import division + +from Queue import Queue +from threading import Thread + +from websocket import trigger_event +import state +import link + +import logging +logger = logging.getLogger('websocket') + +class Tasks(Thread): + + _active = True + + def __init__(self): + self.q = Queue() + Thread.__init__(self) + self.daemon = True + self.start() + + def run(self): + while self._active: + m = self.q.get() + if m: + action, data = m + logger.debug('process task: %s data: %s', action, data) + if action == 'sendMessage': + for peer in state.info.get('nodes', []): + link.remote(peer, 'message', data) + elif action == 'nick': + state.info['nick'] = data + for peer in state.info.get('nodes', []): + link.remote(peer, 'nick', data) + self.q.task_done() + + def join(self): + self._active = False + self.q.put(None) + self.q.join() + return Thread.join(self) + + def queue(self, action, data=None): + self.q.put((action, data)) + diff --git a/example/utils.py b/example/utils.py new file mode 100644 index 0000000..060fa74 --- /dev/null +++ b/example/utils.py @@ -0,0 +1,14 @@ +import json +import datetime + +def _to_json(python_object): + if isinstance(python_object, datetime.datetime): + if python_object.year < 1900: + tt = python_object.timetuple() + return '%d-%02d-%02dT%02d:%02d%02dZ' % tuple(list(tt)[:6]) + return python_object.strftime('%Y-%m-%dT%H:%M:%SZ') + raise TypeError(u'%s %s is not JSON serializable' % (repr(python_object), type(python_object))) + +def json_dumps(obj): + indent = 2 + return json.dumps(obj, indent=indent, default=_to_json, ensure_ascii=False).encode('utf-8') diff --git a/example/websocket.py b/example/websocket.py new file mode 100644 index 0000000..99406f8 --- /dev/null +++ b/example/websocket.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# vi:si:et:sw=4:sts=4:ts=4 +from __future__ import division + +from tornado.websocket import WebSocketHandler +from tornado.ioloop import IOLoop +import json + +import state +import link +from utils import json_dumps + +import logging +logger = logging.getLogger('websocket') + +class Handler(WebSocketHandler): + + def check_origin(self, origin): + # allow access to websocket from site, installer and loader (local file) + return self.request.host in origin \ + or origin == 'null' + + def open(self): + if self.request.headers['origin'] not in ('null',) \ + and self.request.host not in self.request.headers['origin']: + logger.debug('reject cross site attempt to open websocket %s', self.request) + self.close() + if self not in state.websockets: + state.websockets.append(self) + state.info.update(link.post('info')) + self.post('info', state.info) + #websocket calls + def on_message(self, message): + action, data = json.loads(message) + if state.tasks: + state.tasks.queue(action, data) + + def on_close(self): + if self in state.websockets: + state.websockets.remove(self) + + def post(self, event, data): + message = json_dumps([event, data]) + main = IOLoop.instance() + main.add_callback(lambda: self.write_message(message)) + +def trigger_event(event, data): + if len(state.websockets): + logger.debug('trigger event %s %s %s', event, data, len(state.websockets)) + for ws in state.websockets: + try: + ws.post(event, data) + except: + logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)