From 8c2c090794b928218c8464821c42e23c921fb9c6 Mon Sep 17 00:00:00 2001 From: j Date: Wed, 1 Nov 2017 22:59:57 +0000 Subject: [PATCH] add server --- .gitignore | 2 + re.py | 118 ----------------------------------------------------- server.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++ utils.py | 24 +++++++++++ 4 files changed, 139 insertions(+), 118 deletions(-) create mode 100644 .gitignore delete mode 100644 re.py create mode 100755 server.py create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6b9f39e --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +json/* +__pycache__/ diff --git a/re.py b/re.py deleted file mode 100644 index 1dff752..0000000 --- a/re.py +++ /dev/null @@ -1,118 +0,0 @@ -''' -Reccomendation Engine Example -''' - -import json -import os -import random - -import ox - -class Pandora: - - def __init__(self, url, username, password): - self.api = ox.API(url) - self.api.signin(username=username, password=password) - - def find_annotations(self, query, keys): - # print('FIND ANNOTATIONS', query, keys) - return self.api.findAnnotations({ - 'keys': keys, - 'query': query, - 'range': [0, 1000000] - })['data']['items'] - - def find_entities(self, query, keys): - # print('FIND ENTITIES', query, keys) - return self.api.findEntities({ - 'keys': keys, - 'query': query, - 'range': [0, 1000000] - })['data']['items'] - - def get(self, id, keys): - # print('GET', id, keys) - return self.api.get({ - 'id': id, - 'keys': keys - })['data'] - -class Engine: - - def __init__(self, path): - self.path = path - self.pandora = Pandora( - url='http://pandora.dmp/api/', - username='dd.re', - password='dd.re' - ) - - def _shift_clips(self, clips): - index = random.randrange(len(clips)) - return clips[index:] + clips[:index - 1] - - def get_videos(self, user): - products = [] - for event in user['events']: - if 'product' in event['data']: - products.append(event['data']['product']) - - def update(self): - # Get all storylines with tags - storylines = [{ - 'name': entity['name'], - 'tags': entity['tags'] - } for entity in self.pandora.find_entities({ - 'conditions': [ - {'key': 'type', 'operator': '==', 'value': 'storylines'}, - ], - 'operator': '&' - }, ['id', 'name', 'tags']) if entity.get('tags', [])] - # Get list of storyline names - names = list(set([storyline['name'] for storyline in storylines])) - # Get all clips annotated with storyline references - clips = [clip for clip in self.pandora.find_annotations({ - 'conditions': [ - {'key': 'layer', 'operator': '==', 'value': 'storylines'} - ], - 'operator': '&' - }, ['id', 'in', 'out', 'value']) if clip['value'] in names] - # Get list of ids for videos with clips - ids = list(set([clip['id'].split('/')[0] for clip in clips])) - # Get (and cache) order (and code + name) for each video - filename = os.path.join(self.path, 'videos.json') - if os.path.exists(filename): - with open(filename) as f: - videos_ = json.loads(f.read()) - ids_ = [video['id'] for video in videos_] - else: - videos_, ids_ = [], [] - videos = sorted(videos_ + [ - self.pandora.get(id, ['code', 'id', 'order', 'title']) - for id in ids if not id in ids_ - ], key=lambda video: video['order']) - with open(filename, 'w') as f: - f.write(json.dumps(videos, indent=4, sort_keys=True)) - order = {video['id']: video['order'] for video in videos} - # Sort clips - clips = sorted( - clips, - key=lambda clip: order[clip['id'].split('/')[0]] * 1000000 + clip['in'] - ) - # Get playlists - playlists = [playlist for playlist in [{ - 'name': storyline['name'], - 'tags': storyline['tags'], - 'clips': [ - '{}_{:.3f}-{:.3f}'.format( - clip['id'].split('/')[0], clip['in'], clip['out'] - ) for clip in clips if clip['value'] == storyline['name'] - ] - } for storyline in storylines] if playlist['clips']] - with open(os.path.join(self.path, 'playlists.json'), 'w') as f: - f.write(json.dumps(playlists, indent=4, sort_keys=True)) - -if __name__ == '__main__': - engine = Engine('json') - engine.update() - diff --git a/server.py b/server.py new file mode 100755 index 0000000..d084f91 --- /dev/null +++ b/server.py @@ -0,0 +1,113 @@ +#!/usr/bin/python3 +import json +import logging +import os +import time +from urllib.parse import unquote + +from tornado.httpserver import HTTPServer +from tornado.ioloop import IOLoop, PeriodicCallback + +from tornado.web import StaticFileHandler, Application, HTTPError +import tornado.gen +import tornado.web + + +from recommendation_engine import Engine +from utils import json_dumps, run_async + + +BANNER_PUBLIC = 'DD recommondation engine' +BANNER = '%s - use POST at / to access JSON-RPC 2.0 endpoint' % BANNER_PUBLIC + +logger = logging.getLogger(__name__) + + +@run_async +def api_task(request, engine, callback): + try: + if request['method'] == 'getVideos': + result = engine.get_videos(request['params']) + else: + result = {} + response = { + 'result': result + } + except: + logger.error('api failed: %s', request, exc_info=True) + response = {'error': {'code': -32000, 'message': 'Server error'}} + callback(response) + + +class RPCHandler(tornado.web.RequestHandler): + + def initialize(self, engine): + self.engine = engine + + def get(self): + self.write(BANNER) + + @tornado.gen.coroutine + def post(self): + error = None + request = None + try: + request = json.loads(self.request.body.decode()) + if request['method'] not in ('getVideos', ): + raise Exception('unknown method') + except: + error = {'error': {'code': -32700, 'message': 'Parse error'}} + if not error: + try: + response = yield tornado.gen.Task(api_task, request, self.engine) + except: + logger.error("ERROR: %s", request, exc_info=True) + error = {'error': {'code': -32000, 'message': 'Server error'}} + if error: + response = error + if request and 'id' in request: + response['id'] = request['id'] + response['jsonrpc'] = '2.0' + response = json_dumps(response) + self.write(response) + + +def main(prefix='json/'): + settings = { + 'debug': False, + 'port': 8081, + 'address': '' + } + engine = Engine(prefix) + handlers = [ + (r'/', RPCHandler, dict(engine=engine)), + ] + options = { + 'debug': settings['debug'], + 'gzip': True, + } + if settings['debug']: + log_format = '%(asctime)s:%(levelname)s:%(name)s:%(message)s' + logging.basicConfig(level=logging.DEBUG, format=log_format) + + app = Application(handlers, **options) + app.listen(settings['port'], settings['address']) + main = IOLoop.instance() + + @run_async + def update(): + engine.update() + + update_cb = PeriodicCallback(update, 60000) + + #main.spawn_callback(update, engine) + + #fixme run periodically + try: + main.start() + except: + print('shutting down...') + + +if __name__ == '__main__': + main() diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..f9dc1f9 --- /dev/null +++ b/utils.py @@ -0,0 +1,24 @@ +import json +from functools import wraps +from threading import Thread + +def json_dumps(obj): + return json.dumps(obj, indent=4, default=_to_json, ensure_ascii=False, sort_keys=True).encode() + +def _to_json(python_object): + if isinstance(python_object, datetime.datetime): + if python_object.year < 1900: + tt = python_object.timetuple() + return '%d-%02d-%02dT%02d:%02d%02dZ' % tuple(list(tt)[:6]) + return python_object.strftime('%Y-%m-%dT%H:%M:%SZ') + raise TypeError('%s %s is not JSON serializable' % (repr(python_object), type(python_object))) + +def run_async(func): + @wraps(func) + def async_func(*args, **kwargs): + func_hl = Thread(target=func, args=args, kwargs=kwargs) + func_hl.start() + return func_hl + + return async_func +