185 lines
6.2 KiB
Python
185 lines
6.2 KiB
Python
import logging
|
|
import json
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.db import models
|
|
import ox
|
|
|
|
logger = logging.getLogger(__name__)
|
|
User = get_user_model()
|
|
|
|
DEFAULT_LAYER = 'keywords'
|
|
|
|
class Page(models.Model):
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
modified = models.DateTimeField(auto_now=True)
|
|
|
|
slug = models.SlugField(blank=True, unique=True)
|
|
public = models.BooleanField(default=False)
|
|
|
|
title = models.TextField(blank=True)
|
|
body = models.TextField(blank=True)
|
|
data = models.JSONField(default=dict, blank=True)
|
|
|
|
def __str__(self):
|
|
return '%s (%s)' % (self.title, self.slug)
|
|
|
|
def get_absolute_url(self):
|
|
if self.slug:
|
|
return '/' + settings.URL_PREFIX + '' + self.slug
|
|
else:
|
|
return '/' + settings.URL_PREFIX[:-1]
|
|
|
|
|
|
LANGUAGE_CHOICES = (
|
|
('en', 'EN'),
|
|
('zh', 'ZH'),
|
|
)
|
|
|
|
class Text(models.Model):
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
modified = models.DateTimeField(auto_now=True)
|
|
language = models.CharField(choices=LANGUAGE_CHOICES, max_length=8, default='en')
|
|
slug = models.SlugField()
|
|
public = models.BooleanField(default=False)
|
|
position = models.IntegerField(default=0)
|
|
|
|
title = models.TextField()
|
|
byline = models.TextField(default="", blank=True)
|
|
body = models.TextField(default="", blank=True)
|
|
|
|
data = models.JSONField(default=dict)
|
|
annotations = models.JSONField(default=dict, blank=True, editable=False)
|
|
|
|
def __str__(self):
|
|
return self.title
|
|
|
|
def get_absolute_url(self):
|
|
return '/' + settings.URL_PREFIX + 'assemblies/' + self.slug
|
|
|
|
def get_annotations(self):
|
|
api = ox.api.signin(settings.DEFAULT_PANDORA_API)
|
|
if 'item' in self.data:
|
|
return self.get_item_annotations(api)
|
|
elif 'edit' in self.data:
|
|
return self.get_edit_annotations(api)
|
|
else:
|
|
return []
|
|
|
|
def get_item_annotations(self, api):
|
|
query = {
|
|
'id': self.data['item'],
|
|
'keys': [
|
|
'layers'
|
|
]
|
|
}
|
|
layer = self.data.get('layer', None) or DEFAULT_LAYER
|
|
annotations = api.get(**query)['data']['layers'][layer]
|
|
user = self.data.get('user', None)
|
|
if user:
|
|
annotations = list(filter(lambda annot: annot['user'] == user, annotations))
|
|
if self.data.get('only_e'):
|
|
annotations = [e for e in annotations if e['value'].startswith('E:')]
|
|
for e in annotations:
|
|
e['value'] = e['value'][2:].strip()
|
|
else:
|
|
annotations = [e for e in annotations if not e['value'].startswith('E:')]
|
|
return self.get_clips(api, annotations)
|
|
|
|
def get_edit_annotations(self, api):
|
|
query = {
|
|
'id': self.data['edit'],
|
|
'keys': []
|
|
}
|
|
r = api.getEdit(**query)
|
|
if 'data' not in r:
|
|
return []
|
|
clips = r['data']['clips']
|
|
annotations = []
|
|
cited = {}
|
|
layer = self.data.get('layer', None) or DEFAULT_LAYER
|
|
for clip in clips:
|
|
item_id = clip['item']
|
|
cited[item_id] = {
|
|
'title': clip['title'],
|
|
'id': item_id,
|
|
}
|
|
if 'director' in clip:
|
|
cited[item_id]['director'] = clip['director']
|
|
for annotation in clip['layers'][layer]:
|
|
if 'user' in self.data and annotation['user'] != self.data['user']:
|
|
continue
|
|
for key in ('title', 'director', 'date'):
|
|
if key in clip:
|
|
annotation[key] = clip[key]
|
|
if self.data.get('only_e'):
|
|
if annotation['value'].startswith('E:'):
|
|
annotation['value'] = annotation['value'][2:].strip()
|
|
annotations.append(annotation)
|
|
elif not annotation['value'].startswith('E:'):
|
|
annotations.append(annotation)
|
|
self.data['cited'] = list(cited.values())
|
|
return self.get_clips(api, annotations)
|
|
|
|
def get_clips(self, api, annotations):
|
|
item_ids = list({annot['id'].split('/')[0] for annot in annotations})
|
|
query = {
|
|
'itemsQuery': {
|
|
'conditions': [{
|
|
'key': 'id',
|
|
'operator': '&',
|
|
'value': item_ids
|
|
}]
|
|
},
|
|
'range': [0, 10000],
|
|
'keys': [
|
|
'id',
|
|
'hue',
|
|
'saturation',
|
|
'lightness'
|
|
]
|
|
}
|
|
items = api.findClips(**query)['data']['items']
|
|
colors = {}
|
|
for item in items:
|
|
colors[item['id']] = item
|
|
previous = None
|
|
for annot in annotations:
|
|
annot_id = annot['id'].split('/')[0]
|
|
annot_in = '{:.3f}'.format(annot['in'])
|
|
annot_out = '{:.3f}'.format(annot['out'])
|
|
clip_id = f'{annot_id}/{annot_in}-{annot_out}'
|
|
annot['color1'] = colors[clip_id]
|
|
if previous:
|
|
previous['color2'] = annot['color1']
|
|
previous = annot
|
|
if len(annotations) > 0:
|
|
annotations[len(annotations) - 1]['color2'] = annotations[0]['color1']
|
|
return annotations
|
|
|
|
def save(self, *args, **kwargs):
|
|
annotations = self.get_annotations()
|
|
self.annotations = annotations
|
|
super().save(*args, **kwargs)
|
|
|
|
def json(self):
|
|
from ..video.models import Film
|
|
data = {}
|
|
data['title'] = self.title
|
|
data['byline'] = self.byline
|
|
data['body'] = self.body
|
|
data['language'] = self.language
|
|
item_id = self.data.get('related')
|
|
if not item_id:
|
|
item_id = self.data.get('item')
|
|
if item_id:
|
|
item = Film.objects.filter(pandora_id=item_id).first()
|
|
if item:
|
|
for key in ('title', 'title_zh', 'director'):
|
|
data['item_' + key] = item.data[key]
|
|
data['item_url'] = item.get_absolute_url()
|
|
if isinstance(self.annotations, list) and len(self.annotations) > 0:
|
|
data['annotations'] = self.annotations
|
|
data.update(self.data)
|
|
return json.dumps(data)
|