This commit is contained in:
j 2014-08-28 14:52:31 +02:00
parent 536a99ed22
commit d8f5ce5675
8 changed files with 382 additions and 105 deletions

38
example/link.py Normal file
View file

@ -0,0 +1,38 @@
import json
import urllib2
PEERLINK='http://[::1]:8842/'
NAME = 'chat'
def urlopen(url, data=None, headers=None):
if data and not isinstance(data, str):
data = json.dumps(data)
if not headers:
headers = {
'Content-Type': 'application/json',
'User-Agent': 'ChatServer/0.0'
}
opener = urllib2.build_opener()
print 'urlopen', url, data
req = urllib2.Request(url, data=data, headers=headers)
response = opener.open(req)
return response.read()
def post(action, data=None):
url = PEERLINK + action
return json.loads(urlopen(url, data))
def add(name, url):
global NAME
NAME = name
return post('add', {'name': name, 'url': url})
def remote(peer, action, data):
url = PEERLINK + '%s/%s/%s' % (peer, NAME, action)
if not data:
data = {'test': True}
print 'REMOTE', url
return urlopen(url, data)
def remote_json(peer, action, data):
return json.loads(remote(peer, action, data))

204
example/server.py Normal file → Executable file
View file

@ -1,104 +1,140 @@
#!/usr/bin/python #!/usr/bin/env python
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division, print_function
import json import json
import os import os
import shutil import signal
import SimpleHTTPServer
import socket
import SocketServer
import sys import sys
import urllib2 import mimetypes
peers = {} from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import Application
import tornado
NETBASE='http://[::1]:8842/' import websocket
NAME = 'chat' from websocket import trigger_event
import state
from tasks import Tasks
from utils import json_dumps
import link
def remote(peer, action, data): import logging
url = NETBASE + '%s/%s/%s' % (peer, NAME, action) logger = logging.getLogger('server')
if data and not isinstance(data, str):
data = json.dumps(data)
opener = urllib2.build_opener()
req = urllib2.Request(url, data=data, headers={
'Content-Type': 'application/json',
'User-Agent': 'ChatServer/0.0'
})
response = opener.open(req)
return response.read()
root_dir = os.path.normpath(os.path.abspath(os.path.dirname(__file__)))
STATIC_PATH = os.path.join(root_dir, 'static')
def add_service(name, url): class BaseHandler(tornado.web.RequestHandler):
add = NETBASE + 'add'
urllib2.urlopen(add, json.dumps({'name': name, 'url': url}))
class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler): def serve_static(self, path, mimetype=None, include_body=True):
if not mimetype:
def do_GET(self): mimetype = mimetypes.guess_type(path)[0]
print 'GET', self.path logging.debug('serve %s', path)
path = os.path.join('static', self.path[1:] if self.path != '/' else 'index.html')
if os.path.exists(path): if os.path.exists(path):
self.set_header('Content-Type', mimetype)
self.set_header('Content-Length', str(os.stat(path).st_size))
if include_body:
with open(path) as fd: with open(path) as fd:
shutil.copyfileobj(fd, self.wfile) self.write(fd.read())
else:
self.set_status(404)
return
def _remote_request(self, action, data): def render_json(self, response):
response = {} response = json_dumps(response)
if action == 'message': self.set_header('Content-Type', 'application/json')
pass self.set_header('Content-Length', str(len(response)))
elif action == 'ping': self.write(response)
print self.headers self.finish()
response['userid'] = self.headers.getheader('From')
response['remote ping'] = True
response['data'] = data
return response
def _request(self, action, data): class RemoteHandler(BaseHandler):
def post(self, action):
data = json.loads(self.request.body)
response = {} response = {}
if action == 'test': print ('GOT remote request', action, data)
response['test'] = 'ok' if action == 'info':
response['data'] = data response = state.info
elif action in ('ping', 'pong'): elif action == 'message':
data['from'] = self.request.headers['From']
trigger_event(action, data)
else:
response = {'error': 'unknown action'}
return self.render_json(response)
class MainHandler(BaseHandler):
def get(self, path):
path = path[1:]
if not path:
path = 'index.html'
path = os.path.join(STATIC_PATH, path)
self.serve_static(path)
def post(self, path):
action = path.split('/')[1]
data = json.loads(self.request.body)
response = {}
if action in ('info', 'ping'):
if 'id' in data:
id = data['id'] id = data['id']
del data['id'] del data['id']
response = remote(id, action, data) response = link.remote_json(id, action, data)
return response return self.render_json(response)
def do_POST(self):
print 'POST', self.path
length = int(self.headers.getheader('content-length'))
body = self.rfile.read(length)
data = json.loads(body)
if self.path.startswith('/remote'):
action = self.path.split('/')[2]
response = self._remote_request(action, data)
else:
action = self.path.split('/')[1]
response = self._request(action, data)
response = json.dumps(response, indent=2)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
class Server(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
'''
IPv4/IPv6 Dual Stack
'''
address_family = socket.AF_INET6
allow_reuse_address = True
def server_bind(self):
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
SocketServer.TCPServer.server_bind(self)
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) == 2: address = ''
port = int(sys.argv[1])
else:
port = 8000 port = 8000
print "listening on port", port
url = 'http://127.0.0.1:%s/remote/' % port if len(sys.argv) > 1:
add_service(NAME, url) port = int(sys.argv[1])
httpd = Server(("", port), Handler) link.PEERLINK=sys.argv[2]
httpd.serve_forever()
logging.basicConfig(level=logging.DEBUG)
options = {
'debug': True,
}
handlers = [
(r'/ws', websocket.Handler),
(r"/remote/(.*)", RemoteHandler),
(r"(.*)", MainHandler),
]
http_server = HTTPServer(Application(handlers, **options))
http_server.listen(port, address)
state.tasks = Tasks()
state.main = IOLoop.instance()
state._status = PeriodicCallback(lambda: state.info.update(link.post('info')), 60000)
state._status.start()
if ':' in address:
host = '[%s]' % address
elif not address:
host = '[::1]'
else:
host = address
url = 'http://%s:%s/' % (host, port)
link.add('chat', url + 'remote/')
state.info = link.post('info')
print('listening at %s' % url)
def shutdown():
state.tasks.join()
http_server.stop()
signal.signal(signal.SIGTERM, shutdown)
try:
state.main.start()
except:
print('shutting down...')
shutdown()

4
example/state.py Normal file
View file

@ -0,0 +1,4 @@
websockets = []
tasks = None
info = {}
peers = []

90
example/static/chat.js Normal file
View file

@ -0,0 +1,90 @@
var names = {};
Ox.load(function() {
var app = window.app = {};
app.triggerEvent = function(action, data) {
if (action == 'info') {
Ox.$('#id').html(data.id);
Ox.$('#nick').val(data.nick || '');
var peers = Ox.$('#peers');
peers.empty()
data.local.forEach(function(peer) {
console.log('add', peer);
if (names[peer]) {
Ox.$('<div>')
.html(peer + '(' + names[peer] +')')
.appendTo(peers);
} else {
app.getNick(peer, function(nick) {
var html = peer;
if(nick) {
html = peer + '(' + nick +')';
}
Ox.$('<div>')
.html(html)
.appendTo(peers);
});
}
});
} else if (action == 'message') {
var html = (names[data.from] || data.from) + ': ' + data.message;
Ox.$('<div>').html(html).appendTo(Ox.$('#messages'));
}
Ox.print(action, data);
};
app.socket = new WebSocket('ws://' + document.location.host + '/ws');
app.socket.onopen = function(event) {
app.triggerEvent('open', event);
};
app.socket.onmessage = function(event) {
var data = JSON.parse(event.data);
app.triggerEvent(data[0], data[1]);
};
app.socket.onerror = function(event) {
app.triggerEvent('error', event);
app.socket.close();
};
app.socket.onclose = function(event) {
app.triggerEvent('close', event);
setTimeout(connectSocket, 1000);
};
app.request = function(action, data, callback) {
var xhr = new XMLHttpRequest();
xhr.onload = function() {
try {
var response = JSON.parse(this.response);
callback && callback(response);
} catch(e) {
Ox.print('FAIL', this.response);
}
}
xhr.open('POST', '/' + action);
data = JSON.stringify(data);
xhr.send(data);
};
app.api = function(action, data) {
console.log('api', action, data);
app.socket.send(JSON.stringify([action, data]));
};
Ox.$('#send').on({
click: function() {
var value = Ox.$('#message').val();
if (value) {
app.api('sendMessage', {
message: value
});
}
}
})
app.getNick = function(peer, callback) {
app.request('info', {id: peer}, function(response) {
names[peer] = response.nick;
callback(response.nick);
});
}
Ox.$('#nick').on({
change: function() {
app.api('nick', this.value);
}
});
});

View file

@ -4,29 +4,21 @@
<meta charset="utf-8" /> <meta charset="utf-8" />
<title></title> <title></title>
<style> <style>
</style> #messages {
<script src="https://oxjs.org/build/Ox.js"></script>
<script>
Ox.load(function() {
var app = window.app = {};
app.request = function(action, data, callback) {
data = JSON.stringify(data);
var xhr = new XMLHttpRequest();
xhr.open('POST', '/' + action, true);
xhr.onload = function() {
var response = JSON.parse(this.response);
callback(response)
};
xhr.onerror = function(error) {
callback(null, error);
} }
xhr.setRequestHeader('Content-Type', 'application/json'); </style>
xhr.send(data); <script src="http://localhost/oxjs/build/Ox.js"></script>
}; <script src="/chat.js"></script>
});
</script>
</head> </head>
<body> <body>
Nick: <input id="nick" type="text"></input>(<span id="id"></span>)<br>
Peers:
<div id="peers"></div>
<textarea id="message" type="text"></textarea><br/>
<input id="send" type="button" value="Send"></input>
<br>
Message:<br/>
<div id="messages"></div>
</body> </body>
</html> </html>

49
example/tasks.py Normal file
View file

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from Queue import Queue
from threading import Thread
from websocket import trigger_event
import state
import link
import logging
logger = logging.getLogger('websocket')
class Tasks(Thread):
_active = True
def __init__(self):
self.q = Queue()
Thread.__init__(self)
self.daemon = True
self.start()
def run(self):
while self._active:
m = self.q.get()
if m:
action, data = m
logger.debug('process task: %s data: %s', action, data)
if action == 'sendMessage':
for peer in state.info.get('nodes', []):
link.remote(peer, 'message', data)
elif action == 'nick':
state.info['nick'] = data
for peer in state.info.get('nodes', []):
link.remote(peer, 'nick', data)
self.q.task_done()
def join(self):
self._active = False
self.q.put(None)
self.q.join()
return Thread.join(self)
def queue(self, action, data=None):
self.q.put((action, data))

14
example/utils.py Normal file
View file

@ -0,0 +1,14 @@
import json
import datetime
def _to_json(python_object):
if isinstance(python_object, datetime.datetime):
if python_object.year < 1900:
tt = python_object.timetuple()
return '%d-%02d-%02dT%02d:%02d%02dZ' % tuple(list(tt)[:6])
return python_object.strftime('%Y-%m-%dT%H:%M:%SZ')
raise TypeError(u'%s %s is not JSON serializable' % (repr(python_object), type(python_object)))
def json_dumps(obj):
indent = 2
return json.dumps(obj, indent=indent, default=_to_json, ensure_ascii=False).encode('utf-8')

54
example/websocket.py Normal file
View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
from __future__ import division
from tornado.websocket import WebSocketHandler
from tornado.ioloop import IOLoop
import json
import state
import link
from utils import json_dumps
import logging
logger = logging.getLogger('websocket')
class Handler(WebSocketHandler):
def check_origin(self, origin):
# allow access to websocket from site, installer and loader (local file)
return self.request.host in origin \
or origin == 'null'
def open(self):
if self.request.headers['origin'] not in ('null',) \
and self.request.host not in self.request.headers['origin']:
logger.debug('reject cross site attempt to open websocket %s', self.request)
self.close()
if self not in state.websockets:
state.websockets.append(self)
state.info.update(link.post('info'))
self.post('info', state.info)
#websocket calls
def on_message(self, message):
action, data = json.loads(message)
if state.tasks:
state.tasks.queue(action, data)
def on_close(self):
if self in state.websockets:
state.websockets.remove(self)
def post(self, event, data):
message = json_dumps([event, data])
main = IOLoop.instance()
main.add_callback(lambda: self.write_message(message))
def trigger_event(event, data):
if len(state.websockets):
logger.debug('trigger event %s %s %s', event, data, len(state.websockets))
for ws in state.websockets:
try:
ws.post(event, data)
except:
logger.debug('failed to send to ws %s %s %s', ws, event, data, exc_info=1)