''' Reccomendation Engine Example ''' import json import os import random import ox with open('json/pandora.json') as f: # {"url": "...", "username": "...", "password": "..."} PANDORA = json.loads(f.read()) class Pandora: def __init__( self, url=PANDORA['url'], username=PANDORA['username'], password=PANDORA['password'] ): self.api = ox.API(url) self.api.signin(username=username, password=password) def find_annotations(self, query, keys): # print('FIND ANNOTATIONS', query, keys) return self.api.findAnnotations({ 'keys': keys, 'query': query, 'range': [0, 1000000] })['data']['items'] def find_entities(self, query, keys): # print('FIND ENTITIES', query, keys) return self.api.findEntities({ 'keys': keys, 'query': query, 'range': [0, 1000000] })['data']['items'] def get(self, id, keys): # print('GET', id, keys) return self.api.get({ 'id': id, 'keys': keys })['data'] def get_playlists(): # Get all storylines with tags storylines = [{ 'name': entity['name'], 'tags': entity['tags'] } for entity in pandora.find_entities({ 'conditions': [ {'key': 'type', 'operator': '==', 'value': 'storylines'}, ], 'operator': '&' }, ['id', 'name', 'tags']) if entity.get('tags', [])] # Get list of storyline names names = list(set([storyline['name'] for storyline in storylines])) # Get all clips annotated with storyline references clips = [clip for clip in pandora.find_annotations({ 'conditions': [ {'key': 'layer', 'operator': '==', 'value': 'storylines'} ], 'operator': '&' }, ['id', 'in', 'out', 'value']) if clip['value'] in names] # Get list of ids for videos with clips ids = list(set([clip['id'].split('/')[0] for clip in clips])) # Get (and cache) order (and code + name) for each video filename = 'json/videos.json' if os.path.exists(filename): with open(filename) as f: videos_ = json.loads(f.read()) ids_ = [video['id'] for video in videos_] else: videos_, ids_ = [], [] videos = sorted(videos_ + [ pandora.get(id, ['code', 'id', 'order', 'title']) for id in ids if not id in ids_ ], key=lambda video: video['order']) with open(filename, 'w') as f: f.write(json.dumps(videos, indent=4, sort_keys=True)) order = {video['id']: video['order'] for video in videos} print(order) # Sort clips clips = sorted( clips, key=lambda clip: order[clip['id'].split('/')[0]] * 1000000 + clip['in'] ) # Return playlists return [playlist for playlist in [{ 'name': storyline['name'], 'tags': storyline['tags'], 'clips': [ '{}_{:.3f}-{:.3f}'.format( clip['id'].split('/')[0], clip['in'], clip['out'] ) for clip in clips if clip['value'] == storyline['name'] ] } for storyline in storylines] if playlist['clips']] def get_videos(user): products = [] for event in user['events']: if 'product' in event['data']: products.append(event['data']['product']) def shift_clips(clips): index = random.randrange(len(clips)) return clips[index:] + clips[:index - 1] if __name__ == '__main__': pandora = Pandora() playlists = get_playlists() with open('playlists.json', 'w') as f: f.write(json.dumps(playlists, indent=4, sort_keys=True)) print(len(playlists), 'playlists')