dd-re/recommendation_engine.py

458 lines
20 KiB
Python

'''
Recommendation Engine Example
1 Nov 2017, 0x2620
'''
from collections import defaultdict
import json
import logging
import os
import random
import time
import copy
import ox
from utils import run_async
logger = logging.getLogger(__name__)
verbose = True
class Engine:
_pandora = None
def __init__(self, path, **kwargs):
self.path = path
self.pandora_args = dict(
url=kwargs.get('pandora', 'http://pandora.dmp/api/'),
username=kwargs.get('username', 'dd.re'),
password=kwargs.get('password', 'dd.re')
)
filename = os.path.join(self.path, 'playlists.json')
if os.path.exists(filename):
with open(filename) as f:
self.playlists = json.load(f)
else:
self.playlists = []
filename = os.path.join(self.path, 'state.json')
if os.path.exists(filename):
with open(filename) as f:
self.state = json.load(f)
else:
self.state = {
'channels': {
'globalKeywords': {'locked': False, 'value': 7},
'userKeywords': {'locked': False, 'value': 7},
'screenings': {'locked': True, 'value': 2}
},
'globalKeywords': {},
}
if 'gridChange' not in self.state:
self.state['gridChange'] = {
'nextClip': {'locked': True, 'value': 4},
'nextPlaylist': {'locked': False, 'value': 4},
'staySame': {'locked': False, 'value': 8}
}
self.update_keywords()
@property
def pandora(self):
while not self._pandora:
try:
self._pandora = Pandora(**self.pandora_args)
except:
logger.error('failed to connect to pandora, retry in 10 seconds')
time.sleep(10)
return self._pandora
def _patch_clips(self, clips):
inpoints = {}
for index, clip in enumerate(clips):
video_id = clip['id'].split('/')[0]
inpoints[video_id] = inpoints.get(video_id, []) + [{
'index': index,
'position': clip['in']
}]
for video_id in inpoints:
for i, inpoint in enumerate(sorted(
inpoints[video_id], key=lambda inpoint: inpoint['position']
)):
if i < len(inpoints[video_id]) - 1:
clips[inpoint['index']]['out'] = inpoints[video_id][i + 1]['position']
else:
clips[inpoint['index']]['out'] = self.pandora.get(video_id, ['duration'])['duration']
return clips
def get_videos(self, user):
if user.get('events', [{}])[0].get("event")=="login":
return self.get_recommendations(user)
channels = {k: v.get('value', 0) for k, v in self.state['channels'].items()}
sliders = {k: v.get('value', 0) for k, v in self.state['globalKeywords'].items()}
grid_change = {k: v.get('value', 0) for k, v in self.state['gridChange'].items()}
# check if there were grid events for all indexes.
grid_events = {}
(nc, np, ns) = (grid_change.get("nextClip"), grid_change.get("nextPlaylist"), grid_change.get("staySame"))
# this assumes np + nc + ns = total number of videos in the grid view (16).
# Make sure sanity check exists in front-end (error if it does not add up to 16).
video_num = nc + np + ns
# for event in user.get('events', []):
# if event.get('event') == "grid" and event.get('data').get('index') not in grid_events:
# grid_events[event.get('data').get('index')] = event.get('data')
# if len(grid_events) == video_num:
# break
# # The version where the loop also extract play_index (requires "index" in play event data):
play_index = None
for event in user.get('events', []):
if event.get('event') == "grid" and event.get('data').get('index') not in grid_events:
grid_events[event.get('data').get('index')] = event.get('data')
if event.get('event') == "play" and event["data"].get("type") == "video" and not play_index:
play_index = event.get('data').get('index')
if len(grid_events) == video_num and play_index:
break
prev_grid_list = sorted([v for v in grid_events.values()], key=lambda k:k['index'])
# if there were no grid events for all, initialize all grids.
if len(prev_grid_list) < video_num:
return self.get_recommendations(user)
else:
if play_index is None:
video_indx = list(range(video_num))
random.shuffle(video_indx)
else:
# played index is excluded from the random shuffle and deterministically added to staySame pool.
video_indx = [*range(play_index)]+[*range(play_index+1,video_num)]
random.shuffle(video_indx)
video_indx.append(play_index)
next_clip_index = video_indx[:nc]
next_playlist_index = video_indx[nc:nc+np]
stay_same_index = video_indx[nc+np:]
rec_list = []
# select next clip for nextClip pool except when the playlist has only one clip.
for i in next_clip_index:
if prev_grid_list[i].get('playlist') is None:
# add this to deal with the absence of "playlist" data in old grid event.
# If there's no playlist data recorded, add the nextClip pool to nextPlaylist pool.
next_playlist_index.append(next_clip_index)
break
else:
# if "playlist" and "playlistPostion" (if not, default to 0) exists in grid event
for playlist in self.playlists:
if playlist.get('name')== prev_grid_list[i].get('playlist'):
if len(playlist["clips"]) == 1:
next_playlist_index.append(i)
break
# Discuss how this behavour should be: should it switch to a new playlist if it is the end of the playlist clip sequence already?
elif prev_grid_list[i].get('playlistPosition', 0) + 1 == len(playlist['clips']):
playlist_pos = 0
else:
playlist_pos = prev_grid_list[i].get('playlistPosition', 0) + 1
rec_list.append((i, {
'clips': playlist['clips'],
# 'position': random.randrange(len(playlist['clips'])),
'position': playlist_pos,
'name': playlist['name'],
'tags': playlist['tags'],
}))
# randomly select playlists (excluding the playlists from the current grid once "playlist" is recorded for grid events)
# for nextPlaylist pool.
vids_exclude = [e.get("playlist") for e in prev_grid_list]
while None in vids_exclude:
vids_exclude.remove(None)
video = self.get_recommendations(user, vids_exclude)
rec_list += [(i, video[i]) for i in next_playlist_index]
#staySame pool
rec_list += [(i,{}) for i in stay_same_index]
rec_list = sorted(rec_list, key=lambda k:k[0])
return [e[1] for e in rec_list]
# Current assumption: Avoid the same playlist in the grid view. In the future, the same playlist could be played in the grid view as long as these are differenct clips or separate times?
def get_recommendations(self, user, vids_exclude = []):
channels = {k: v.get('value', 0) for k, v in self.state['channels'].items()}
sliders = {k: v.get('value', 0) for k, v in self.state['globalKeywords'].items()}
gridChange = {k: v.get('value', 0) for k, v in self.state['gridChange'].items()}
# Exclude playlists from the most recent grid
playlists = copy.deepcopy(self.playlists)
if len(vids_exclude) > 0:
for playlist in playlists:
if playlist["name"] in vids_exclude:
playlists.remove(playlist)
# For each playlist, compute user keyword score
user_keywords = user.get('keywords', {})
score = {}
for playlist in playlists:
score[playlist['name']] = random.random()
for tag in [tag for tag in playlist['tags'] if tag in user_keywords]:
score[playlist['name']] += user_keywords[tag]
# Select highest scoring playlists
playlists = sorted(
playlists,
key=lambda playlist: -score[playlist['name']]
)
videos = playlists[:channels['userKeywords']]
playlists = playlists[channels['userKeywords']:]
# For each playlist, compute global keyword score
score = {}
for playlist in playlists:
score[playlist['name']] = random.random()
for tag in [tag for tag in playlist['tags'] if tag in sliders]:
score[playlist['name']] += sliders[tag]
# Select highest scoring playlists
playlists = sorted(
playlists,
key=lambda playlist: -score[playlist['name']]
)
videos += playlists[:channels['globalKeywords']]
playlists = playlists[channels['globalKeywords']:]
# Count products the user has seen
count = defaultdict(lambda: 0)
for event in user.get('events', []):
if event.get('data', {}).get('product'):
count[event['data']['product']] += 1
# For each product in playlist tags, increment score by count
for playlist in playlists:
score[playlist['name']] = random.random()
for tag in set(playlist['tags']) & set(count):
score[playlist['name']] += count[tag]
# Select highest scoring playlists
videos += sorted(
playlists,
key=lambda playlist: -score[playlist['name']]
)[:16 - channels['userKeywords'] - channels['globalKeywords']]
# Shuffle playlists (randomize layout) and shift clips (randomize start)
random.shuffle(videos)
return [{
'clips': video['clips'],
'position': random.randrange(len(video['clips'])),
'name': video['name'],
'tags': video['tags'],
} for video in videos]
# Output: playlists with updated in/out time of clips that have been watched.
# Watched is defined as a video being played in full screen.
# "watch_cutoff" parameter: the portion of the clip duration to be determined as watched the whole clip. should be [0,1]
# + check (play, pause) pairs and eliminate unusual cases most likely due to a bug.
# + If (play, pause) pairs exceed XX(80-90?) percent of the clip length, eliminate the clip from the playlist.
# + Otherwise, find the last pause position of a clip and record it as "in" position of the clip.
# + If the clips are all eliminated from a playlist, eliminate the playlist.
def update_user_playlists(playlists, user, watch_cutoff = 0.8):
play = {}
watched = []
clip_max_dur = 10800 # = 3 hours; arbitrary max duration allowed for (pause time - play time) to detect outlier/bugs
# The current max time of a clip duration is 10379.383333377269 from "DDLaunch: Erik Verlinde, Gravity as an emergent force (1956)"
for event in user["events"][::-1]:
if event["event"] == "play" and event["data"].get("type") == "video":
play = event
elif event["event"] == "pause" and play!={} and event["data"].get("type") == "video":
if "position" not in play["data"]:
play = {}
break
if play["data"].get("playlist") == event["data"].get("playlist"):
if event["data"]["position"] - play["data"]["position"] > 0 and event["data"]["position"] - play["data"]["position"] < clip_max_dur and event["data"].get("playlistPosition") == play["data"].get("playlistPosition") and event["data"].get("playlistPosition") is not None:
i = event["data"]["playlistPosition"]
for playlist in playlists:
if playlist["name"] == event["data"]["playlist"] and i < len(playlist["clips"]):
if play["data"]["position"] >= max(playlist["clips"][i]["in"] - 30, 0) and event["data"]["position"] <= playlist["clips"][i]["out"] + 30:
# This assumes the (play, pause) fits inside the clip's (in, out) segment with +/- 30secs buffer. Check if there are instances where this might not be the case.
# i.e. clip in/out may be edited (before after edit inconsistency); skip may trigger jump to a wrong clip (bug)
if event["data"]["position"] >= ((playlist["clips"][i]["out"]-playlist["clips"][i]["in"])*watch_cutoff + playlist["clips"][i]["in"]):
watched.append((playlist["name"],i))
else:
playlist["clips"][i]["in"] = event["data"]["position"]
break
play = {}
d_watched = defaultdict(set)
for k, v in watched:
d_watched[k].add(v)
for k, v in d_watched.items():
for playlist in playlists:
if playlist["name"] == k:
if len(v) == len(playlist["clips"]):
playlists.remove(playlist)
else:
playlist["clips"] = [playlist["clips"][i] for i in range(len(playlist["clips"])) if i not in v]
break
return(playlists)
def get_next(self, user, position):
grid_events = {}
video_num = 16
for event in user.get('events', []):
if event.get('event') == "grid" and event.get('data').get('index') not in grid_events:
grid_events[event.get('data').get('index')] = event.get('data')
if len(grid_events) == video_num:
break
prev_grid_list = sorted([v for v in grid_events.values()], key=lambda k:k['index'])
vids_exclude = [e.get("playlist") for e in prev_grid_list]
video = self.get_recommendations(user, vids_exclude)[position]
return video
def update_state(self, data):
for key in data:
if key in self.state:
self.state[key].update(data[key])
else:
self.state[key] = data[key]
self.save_state()
return self.state
def save_state(self):
filename = os.path.join(self.path, 'state.json')
with open(filename, 'w') as f:
json.dump(self.state, f, indent=4, ensure_ascii=False, sort_keys=True)
def update(self):
# Get all storylines with tags
storylines = [{
'id': entity['id'],
'name': entity['name'],
'nodename': entity['nodename'],
'tags': [t.strip() for t in entity['tags']]
} for entity in self.pandora.find_entities({
'conditions': [
{'key': 'type', 'operator': '==', 'value': 'storylines'},
],
'operator': '&'
}, ['id', 'name', 'tags', 'nodename']) if entity.get('tags', []) and entity.get('nodename')]
# Get list of storyline names
names = list(set([storyline['name'] for storyline in storylines]))
# Get list of items to use in DD
items = [item['id'] for item in self.pandora.find({
'conditions': [
{'key': 'list', 'operator': '==', 'value': 'dau:DD'}
]
}, ['id'])]
# Get all clips annotated with storyline references
clips = [clip for clip in self.pandora.find_annotations({
'conditions': [
{'key': 'layer', 'operator': '==', 'value': 'storylines'}
],
'operator': '&'
}, ['id', 'in', 'out', 'value']) if clip['value'] in names and clip['id'].split('/')[0] in items]
# Get list of ids for videos with clips
ids = list(set([clip['id'].split('/')[0] for clip in clips]))
# Get and cache video data
filename = os.path.join(self.path, 'videos.json')
if os.path.exists(filename):
with open(filename) as f:
videos_ = json.loads(f.read())
ids_ = [video['id'] for video in videos_]
else:
videos_, ids_ = [], []
videos = sorted(videos_ + [
self.pandora.get(id, ['code', 'id', 'order', 'title'])
for id in ids if not id in ids_
], key=lambda video: int(video['order']))
with open(filename, 'w') as f:
f.write(json.dumps(videos, indent=4, sort_keys=True))
# Get video order
order = {video['id']: int(video['order']) for video in videos}
# Sort clips
clips = sorted(
clips,
key=lambda clip: (order[clip['id'].split('/')[0]], clip['in'])
)
# Get and cache playlists
self.playlists = [playlist for playlist in [{
'id': storyline['id'],
'name': storyline['nodename'].strip(),
'tags': storyline['tags'],
'clips': [{
'item': clip['id'].split('/')[0],
'id': clip['id'],
'in': clip['in'],
'out': clip['out']
} for clip in clips if clip['value'] == storyline['name']]
} for storyline in storylines] if playlist['clips']]
with open(os.path.join(self.path, 'playlists.json'), 'w') as f:
f.write(json.dumps(self.playlists, indent=4, sort_keys=True))
self.update_keywords()
def update_keywords(self):
changed = False
if 'globalKeywords' not in self.state:
self.state['globalKeywords'] = {}
changed = True
existing_tags = set()
for playlist in self.playlists:
for tag in playlist.get('tags', []):
if not tag.isupper() and tag:
existing_tags.add(tag)
if not tag.isupper() and tag not in self.state['globalKeywords']:
self.state['globalKeywords'][tag] = {'value': 0}
changed = True
for tag in set(self.state['globalKeywords']) - existing_tags:
del self.state['globalKeywords'][tag]
changed = True
if changed:
self.save_state()
@run_async
def update_async(self):
self.update()
class Pandora:
# pan.do/ra API wrapper
def __init__(self, url, username, password):
self.api = ox.API(url)
self.api.signin(username=username, password=password)
def find(self, query, keys):
# print('FIND', query, keys)
return self.api.find({
'keys': keys,
'query': query,
'range': [0, 1000000]
})['data']['items']
def find_annotations(self, query, keys):
# print('FIND ANNOTATIONS', query, keys)
return self.api.findAnnotations({
'keys': keys,
'query': query,
'range': [0, 1000000]
})['data']['items']
def find_entities(self, query, keys):
# print('FIND ENTITIES', query, keys)
return self.api.findEntities({
'keys': keys,
'query': query,
'range': [0, 1000000]
})['data']['items']
def get(self, id, keys):
# print('GET', id, keys)
return self.api.get({
'id': id,
'keys': keys
})['data']
if __name__ == '__main__':
engine = Engine('json')
engine.update()