add server

This commit is contained in:
j 2017-11-01 22:59:57 +00:00
parent 5620b9caa2
commit 8c2c090794
4 changed files with 139 additions and 118 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
json/*
__pycache__/

118
re.py
View file

@ -1,118 +0,0 @@
'''
Reccomendation Engine Example
'''
import json
import os
import random
import ox
class Pandora:
def __init__(self, url, username, password):
self.api = ox.API(url)
self.api.signin(username=username, password=password)
def find_annotations(self, query, keys):
# print('FIND ANNOTATIONS', query, keys)
return self.api.findAnnotations({
'keys': keys,
'query': query,
'range': [0, 1000000]
})['data']['items']
def find_entities(self, query, keys):
# print('FIND ENTITIES', query, keys)
return self.api.findEntities({
'keys': keys,
'query': query,
'range': [0, 1000000]
})['data']['items']
def get(self, id, keys):
# print('GET', id, keys)
return self.api.get({
'id': id,
'keys': keys
})['data']
class Engine:
def __init__(self, path):
self.path = path
self.pandora = Pandora(
url='http://pandora.dmp/api/',
username='dd.re',
password='dd.re'
)
def _shift_clips(self, clips):
index = random.randrange(len(clips))
return clips[index:] + clips[:index - 1]
def get_videos(self, user):
products = []
for event in user['events']:
if 'product' in event['data']:
products.append(event['data']['product'])
def update(self):
# Get all storylines with tags
storylines = [{
'name': entity['name'],
'tags': entity['tags']
} for entity in self.pandora.find_entities({
'conditions': [
{'key': 'type', 'operator': '==', 'value': 'storylines'},
],
'operator': '&'
}, ['id', 'name', 'tags']) if entity.get('tags', [])]
# Get list of storyline names
names = list(set([storyline['name'] for storyline in storylines]))
# Get all clips annotated with storyline references
clips = [clip for clip in self.pandora.find_annotations({
'conditions': [
{'key': 'layer', 'operator': '==', 'value': 'storylines'}
],
'operator': '&'
}, ['id', 'in', 'out', 'value']) if clip['value'] in names]
# Get list of ids for videos with clips
ids = list(set([clip['id'].split('/')[0] for clip in clips]))
# Get (and cache) order (and code + name) for each video
filename = os.path.join(self.path, 'videos.json')
if os.path.exists(filename):
with open(filename) as f:
videos_ = json.loads(f.read())
ids_ = [video['id'] for video in videos_]
else:
videos_, ids_ = [], []
videos = sorted(videos_ + [
self.pandora.get(id, ['code', 'id', 'order', 'title'])
for id in ids if not id in ids_
], key=lambda video: video['order'])
with open(filename, 'w') as f:
f.write(json.dumps(videos, indent=4, sort_keys=True))
order = {video['id']: video['order'] for video in videos}
# Sort clips
clips = sorted(
clips,
key=lambda clip: order[clip['id'].split('/')[0]] * 1000000 + clip['in']
)
# Get playlists
playlists = [playlist for playlist in [{
'name': storyline['name'],
'tags': storyline['tags'],
'clips': [
'{}_{:.3f}-{:.3f}'.format(
clip['id'].split('/')[0], clip['in'], clip['out']
) for clip in clips if clip['value'] == storyline['name']
]
} for storyline in storylines] if playlist['clips']]
with open(os.path.join(self.path, 'playlists.json'), 'w') as f:
f.write(json.dumps(playlists, indent=4, sort_keys=True))
if __name__ == '__main__':
engine = Engine('json')
engine.update()

113
server.py Executable file
View file

@ -0,0 +1,113 @@
#!/usr/bin/python3
import json
import logging
import os
import time
from urllib.parse import unquote
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import StaticFileHandler, Application, HTTPError
import tornado.gen
import tornado.web
from recommendation_engine import Engine
from utils import json_dumps, run_async
BANNER_PUBLIC = 'DD recommondation engine'
BANNER = '%s - use POST at / to access JSON-RPC 2.0 endpoint' % BANNER_PUBLIC
logger = logging.getLogger(__name__)
@run_async
def api_task(request, engine, callback):
try:
if request['method'] == 'getVideos':
result = engine.get_videos(request['params'])
else:
result = {}
response = {
'result': result
}
except:
logger.error('api failed: %s', request, exc_info=True)
response = {'error': {'code': -32000, 'message': 'Server error'}}
callback(response)
class RPCHandler(tornado.web.RequestHandler):
def initialize(self, engine):
self.engine = engine
def get(self):
self.write(BANNER)
@tornado.gen.coroutine
def post(self):
error = None
request = None
try:
request = json.loads(self.request.body.decode())
if request['method'] not in ('getVideos', ):
raise Exception('unknown method')
except:
error = {'error': {'code': -32700, 'message': 'Parse error'}}
if not error:
try:
response = yield tornado.gen.Task(api_task, request, self.engine)
except:
logger.error("ERROR: %s", request, exc_info=True)
error = {'error': {'code': -32000, 'message': 'Server error'}}
if error:
response = error
if request and 'id' in request:
response['id'] = request['id']
response['jsonrpc'] = '2.0'
response = json_dumps(response)
self.write(response)
def main(prefix='json/'):
settings = {
'debug': False,
'port': 8081,
'address': ''
}
engine = Engine(prefix)
handlers = [
(r'/', RPCHandler, dict(engine=engine)),
]
options = {
'debug': settings['debug'],
'gzip': True,
}
if settings['debug']:
log_format = '%(asctime)s:%(levelname)s:%(name)s:%(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)
app = Application(handlers, **options)
app.listen(settings['port'], settings['address'])
main = IOLoop.instance()
@run_async
def update():
engine.update()
update_cb = PeriodicCallback(update, 60000)
#main.spawn_callback(update, engine)
#fixme run periodically
try:
main.start()
except:
print('shutting down...')
if __name__ == '__main__':
main()

24
utils.py Normal file
View file

@ -0,0 +1,24 @@
import json
from functools import wraps
from threading import Thread
def json_dumps(obj):
return json.dumps(obj, indent=4, default=_to_json, ensure_ascii=False, sort_keys=True).encode()
def _to_json(python_object):
if isinstance(python_object, datetime.datetime):
if python_object.year < 1900:
tt = python_object.timetuple()
return '%d-%02d-%02dT%02d:%02d%02dZ' % tuple(list(tt)[:6])
return python_object.strftime('%Y-%m-%dT%H:%M:%SZ')
raise TypeError('%s %s is not JSON serializable' % (repr(python_object), type(python_object)))
def run_async(func):
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target=func, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func