lets start with google and imdb
# -*- Mode: Python; -*-
# vi:si:et:sw=2:sts=2:ts=2
# encoding: utf-8
__version__ = '0.1.0'
from net import *
# -*- Mode: Python; -*-
# -*- coding: utf-8 -*-
# vi:si:et:sw=2:sts=2:ts=2
import re
import time
import urllib
import urllib2
import weakref
import threading
import Queue
import oxutils
from oxutils import stripTags
FIXME this function should be replaced by something more minimal find function
import google
for result in google.find(query): result
result is title, url, description
google.find(query, max_results)
def getUrl(url, data=None, headers=oxutils.net.DEFAULT_HEADERS):
return oxutils.cache.getUrl(url, data, headers, google_timeout)
def quote_plus(s):
return urllib.quote_plus(s.encode('utf-8'))
def get_search_page_links(page, results_per_page, begin, end, link_re):
Given str contents of search result page, return list of links.
Returns list of (name, url, desc) str tuples. See make_searcher()
for a description of results_per_page and link_re.
if begin is not None and begin in page:
page = page[page.index(begin):]
if end is not None and end in page:
page = page[:page.index(end)]
ans = []
for match in re.compile(link_re, re.DOTALL).finditer(page):
(name, url, desc) = match.group('name', 'url', 'desc')
ans += [(stripTags(name), url, stripTags(desc))]
return ans
def nonblocking(f, blocking_return=None, sleep_time=0.01):
Wrap a callable which returns an iter so that it no longer blocks.
The wrapped iterator returns blocking_return while callable f is
blocking. The callable f is called in a background thread. If the
wrapped iterator is deleted, then the iterator returned by f is
deleted also and the background thread is terminated.
def g(*args, **kwargs):
f_iter = f(*args, **kwargs)
g_iter = None
def run():
while True:
g_obj = g_iter()
if g_obj is None:
if g_obj.q.qsize() == 0:
f_next = f_iter.next()
except Exception, e:
g_obj.exc = e
del g_obj
class Iter:
def __init__(self):
self.q = Queue.Queue()
self.exc = None
self.thread = threading.Thread(target=run)
def next(self):
if self.exc is not None:
raise self.exc
return self.q.get_nowait()
except Queue.Empty:
return blocking_return
def __iter__(self):
return self
obj = Iter()
g_iter = weakref.ref(obj)
return obj
del obj
return g
def make_searcher(query_url, results_per_page, page_url, page_mode,
begin, end, link_re):
Return a search function for the given search engine.
Here query_url is the URL for the initial search, with %(q)s for
the query string, results_per_page is the number of search results
per page, page_url is the URL for the 2nd and subsequent pages of
search results, with %(q)s for the query string and %(n)s for the
page "number." Here page_mode controls the actual value for the
page "number:"
- page_mode='page0': Use 0-based index of the page.
- page_mode='page1': Use 1-based index of the page.
- page_mode='offset0': Use 0-based index of the search result,
which is a multiple of results_per_page.
- page_mode='offset1': Use 1-based index of the search result
(one plus a multiple of results_per_page).
If begin is not None, then only text after the first occurrence of
begin will be used in the search results page. If end is not None,
then only text before the first occurrence of end will be used.
Finally, link_re is a regex string (see module re) which matches
three named groups: 'name', 'url', and 'desc'. These correspond to
the name, URL and description of each search result. The regex is
applied in re.DOTALL mode.
Returns a search() function which has the same interface as
described in the module docstring.
def search_blocking(query, max_results):
last_links = None
page_num = 0
q = Queue.Queue()
for i in range(max_results):
if q.qsize() == 0:
if page_num == 0:
page = getUrl(query_url % {'q': quote_plus(query)})
if page_mode == 'page0':
n = page_num
elif page_mode == 'page1':
n = page_num + 1
elif page_mode == 'offset0':
n = page_num * results_per_page
elif page_mode == 'offset1':
n = page_num * results_per_page + 1
raise ValueError('unknown page mode')
page = getUrl(page_url % {'n': n, 'q': quote_plus(query)})
page_num += 1
links = get_search_page_links(page, results_per_page, begin, end, link_re)
if len(links) == 0 or links == last_links:
last_links = links
for link in links:
yield q.get()
search_nonblocking = nonblocking(search_blocking)
def search(query, max_results=DEFAULT_MAX_RESULTS, blocking=True):
See docstring for web_search module.
if blocking:
return search_blocking(query, max_results)
return search_nonblocking(query, max_results)
return search
find = make_searcher('http://www.google.com/search?q=%(q)s', 10,
'http://www.google.com/search?start=%(n)d&q=%(q)s', 'offset0',
None, None,
r'<a href="(?P<url>[^"]*?)" class=l.*?>(?P<name>.*?)</a>' +
r'.*?(?:<br>|<table.*?>)' +
r'(?P<desc>.*?)' + '(?:<font color=#008000>|<a)')
# -*- Mode: Python; -*-
# -*- coding: utf-8 -*-
# vi:si:et:sw=2:sts=2:ts=2
from oxutils import *
import urllib2
from urllib import quote
import re, time
import os
import time
from BeautifulSoup import BeautifulSoup
import chardet
import oxutils
from oxutils import stripTags, htmldecode
from oxutils.cache import getUrl, getUrlUnicode
from oxutils.normalize import normalizeTitle
import google
def _get_data(url):
data = None
data = getUrl(url)
print "error reading data from", url
return data
def get_image(url):
return getUrl(url)
def _castList(data, regexp):
soup = re.compile(regexp).findall(data)
if soup:
soup = BeautifulSoup(soup[0])
names = []
for i in soup('a', {'href': re.compile('/name/nm')}):
if i.string:
cast = stripTags(i.string)
if cast not in names:
return names
return []
def _getTerm(data, regexp):
term = ''
reg = re.compile(regexp, re.IGNORECASE)
m = reg.search(data)
if m:
term = stripTags(m.group(1)).strip()
print "waring, parsing failed for", regexp
return term.encode('utf8')
class IMDb:
def __init__(self, imdb):
self.imdb = imdb
self.pageSource = None
self.pageUrl = "http://www.imdb.com/title/tt%s/" % self.imdb
self.businessSource = None
self.businessUrl = "%sbusiness" % self.pageUrl
self.connectionsSource = None
self.connectionsUrl = "%smovieconnections" % self.pageUrl
self.creditsSource = None
self.creditsUrl = "%sfullcredits" % self.pageUrl
self.episodesSource = None
self.episodesUrl = "%sepisodes" % self.pageUrl
self.keywordSource = None
self.keywordUrl = "%skeywords" % self.pageUrl
self.plotSource = None
self.plotUrl = "%splotsummary" % self.pageUrl
self.releaseinfoSource = None
self.releaseinfoUrl = "%sreleaseinfo" % self.pageUrl
self.triviaSource = None
self.triviaUrl = "%strivia" % self.pageUrl
self.locationSource = None
self.locationUrl = "%slocations" % self.pageUrl
self.externalreviewsSource = None
self.externalreviewsUrl = "%sexternalreviews" % self.pageUrl
self.trailerSource = None
self.trailerUrl = "%strailers" % self.pageUrl
def getPage(self, forcereload = False):
if forcereload or not self.pageSource:
self.pageSource = getUrlUnicode(self.pageUrl)
return self.pageSource
def parse_raw_value(self, key, value):
if key in ('runtime', 'language', 'genre', 'country', 'tagline', 'plot_outline'):
value = unicode(value, 'utf-8')
value = stripTags(value).strip()
if key == 'runtime':
parsed_value = _getTerm(value, '(.*?) min')
parsed_value = _getTerm(parsed_value, '([0-9]+)')
if not parsed_value:
parsed_value = _getTerm(value, '(.*?) sec')
parsed_value = _getTerm(parsed_value, '([0-9]+)')
if not parsed_value:
parsed_value = 0
parsed_value = int(parsed_value)
parsed_value = int(parsed_value) * 60
elif key in ('country', 'language'):
parsed_value = value.split(' / ')
elif key == 'genre':
parsed_value = value.replace('more', '').strip().split(' / ')
elif key == 'tagline':
parsed_value = value.replace('more', '').strip()
elif key == 'plot_outline':
parsed_value = value.replace('(view trailer)', '').strip()
if parsed_value.endswith('more'):
parsed_value = parsed_value[:-4].strip()
elif key == 'tv_series':
m = re.compile('<a href="/title/tt(.*?)/">(.*?)</a>').findall(value)
if m:
parsed_value = m[0][0]
parsed_value = ''
elif key == 'also_known_as':
parsed_value = ''
m = re.compile('(.*) \(International: English title').findall(value)
if m:
parsed_value = m[0]
m = re.compile('(.*) \(USA').findall(value)
if m:
parsed_value = m[0]
parsed_value = parsed_value.split('<br />')[-1].split('(')[0]
director = self.parseCredits().get('director', None)
if director:
director = director[0]
parsed_value = parsed_value.replace(director, '')
if parsed_value.startswith("'s"):
parsed_value = parsed_value[2:].strip()
parsed_value = parsed_value.strip()
print value
parsed_value = value
return parsed_value
def parseTitle(self):
title = ''
data = self.getPage()
soup = BeautifulSoup(data)
html_title = soup('div', {'id': 'tn15title'})
if not html_title:
html_title = soup('title')
if html_title:
html_title = str(html_title[0])
html_title = html_title.replace('<br />', ' ').replace(' ', ' ')
title = stripTags(html_title)
title = re.sub('\(\d\d\d\d\)', '', title)
title = re.sub('\(\d\d\d\d/I*\)', '', title)
for t in ('TV series', 'TV-Series', 'TV mini-series', '(mini)', '(VG)', '(V)', '(TV)'):
title = title.replace(t, '')
if title.find(u'\xa0') > -1:
title = title[:title.find(u'\xa0')]
title = normalizeTitle(title.strip())
if title.startswith('"') and title.endswith('"'):
title = normalizeTitle(title[1:-1])
elif title.startswith('"') and title.find('"',1) > 0 and \
title.find('"',1) == title.rfind('"'):
se = re.compile("Season (\d*), Episode (\d*)\)").findall(data)
if se:
se = se[0]
se = ' (S%02dE%02d)' % (int(se[0]), int(se[1]))
title = normalizeTitle(title[1:title.rfind('"')]) + se + title[title.rfind('"')+1:]
title = normalizeTitle(title[1:title.rfind('"')]) + ':' + title[title.rfind('"')+1:]
return normalizeTitle(title)
def parseYear(self):
year = ''
data = self.getPage()
soup = BeautifulSoup(data)
html_title = soup('div', {'id': 'tn15title'})
if not html_title:
html_title = soup('title')
if html_title:
html_title = str(html_title[0])
html_title = stripTags(html_title)
year = re.compile('\((\d\d\d\d)\)').findall(html_title)
if not year:
year = re.compile('\((\d\d\d\d)/').findall(html_title)
if year:
year = year[0]
else: year = ''
return year
def parse(self):
data = self.getPage()
IMDbDict ={}
IMDbDict['poster'] = _getTerm(data, 'name="poster".*?<img .*?src="(.*?)"')
if not IMDbDict['poster']:
IMDbDict['poster'] = 'http://i.imdb.com/Heads/npa.gif'
#Title, Year
IMDbDict['year'] = self.parseYear()
IMDbDict['title'] = self.parseTitle()
m = re.compile('<b>(.*?)/10</b>', re.IGNORECASE).search(data)
if m:
IMDbDict['rating'] = int(float(m.group(1)) * 1000)
IMDbDict['rating'] = -1
m = re.compile('<small>\(<a href="ratings">(.*?) votes</a>\)</small>', re.IGNORECASE).findall(data)
if m:
IMDbDict['votes'] = int(m[0].replace(',', ''))
IMDbDict['votes'] = -1
data = data.replace('\n',' ')
#some values
keys = ('runtime', 'language', 'genre', 'country', 'tagline', 'plot_outline', 'tv_series', 'also_known_as')
for key in keys:
IMDbDict[key] = ''
IMDbDict['runtime'] = 0
soup = BeautifulSoup(data)
for info in soup('div', {'class': 'info'}):
key = str(info).split('</h5>')[0].split('<h5>')
if len(key) > 1:
raw_value = str(info).split('</h5>')[1]
key = key[1][:-1].lower().replace(' ', '_')
if key in keys:
IMDbDict[key] = self.parse_raw_value(key, raw_value)
IMDbDict['title_english'] = IMDbDict.pop('also_known_as', IMDbDict['title'])
#is episode
IMDbDict['episode_of'] = IMDbDict.pop('tv_series', '')
IMDbDict['episodes'] = self.parseEpisodes()
if IMDbDict['episodes']:
IMDbDict['tvshow'] = True
IMDbDict['tvshow'] = False
IMDbDict['credits'] = self.parseCredits()
IMDbDict['plot'] = self.parsePlot()
IMDbDict['keywords'] = self.parseKeywords()
IMDbDict['trivia'] = self.parseTrivia()
IMDbDict['connections'] = self.parseConnections()
IMDbDict['locations'] = self.parseLocations()
IMDbDict['release_date'] = self.parseReleaseinfo()
IMDbDict['business'] = self.parseBusiness()
IMDbDict['reviews'] = self.parseExternalreviews()
IMDbDict['stills'] = getMovieStills(self.imdb)
#IMDbDict['trailer'] = self.parseTrailer()
self.IMDbDict = IMDbDict
if IMDbDict['episode_of']:
episode_of =IMDb(IMDbDict['episode_of']).parse()
for key in ('country', 'language'):
if not IMDbDict[key]:
IMDbDict[key] = episode_of[key]
return self.IMDbDict
def getCredits(self, forcereload = False):
if forcereload or not self.creditsSource:
self.creditsSource = getUrlUnicode(self.creditsUrl)
return self.creditsSource
def parseCredits(self):
data = self.getCredits()
credits = {}
credits['director'] = _castList(data, 'Directed by.*?(<tr>.*?)</table>')
credits['writer'] = _castList(data, 'Writing credits.*?(<tr>.*?)</table>')
credits['producer'] = _castList(data, 'Produced by.*?(<tr>.*?)</table>')
#credits['cast'] = _castList(data, 'Cast</b>.*?(<tr.*?)</table>')
credits['cast'] = []
soup = re.compile('Cast</b>.*?(<tr.*?)</table>').findall(data)
soup = BeautifulSoup(data)
cast = soup('table', {'class': 'cast'})
if cast:
cast = str(cast[0]).replace(u'\xa0', ' ')
names = re.compile('<a href="/name/nm.*?/">(.*?)</a>.*?</td><td class="char">(.*?)</td></tr>').findall(cast)
for name in names:
real_name = name[0]
role_name = name[1]
if role_name:
role_name = role_name.split('(')[0].replace('/ ...','')
credits['cast'].append((stripTags(real_name), stripTags(role_name)))
self.credits = credits
return self.credits
def getPlot(self, forcereload = False):
if forcereload or not self.plotSource:
self.plotSource = getUrlUnicode(self.plotUrl)
return self.plotSource
def parsePlot(self):
soup = BeautifulSoup(self.getPlot())
plot = soup('p', {'class':'plotpar'})
if plot:
plot = unicode(plot[0]).split('<i>')[0]
plot = u''
plot = stripTags(plot).strip()
self.plot = plot
return plot
def getEpisodes(self, forcereload = False):
if forcereload or not self.episodesSource:
self.episodesSource = getUrlUnicode(self.episodesUrl)
return self.episodesSource
def parseEpisodes(self):
episodes = {}
cdata = self.getEpisodes().replace('\r\n', ' ')
regexp = r'''<h4>Season (.*?), Episode (.*?): <a href="/title/tt(.*?)/">(.*?)</a></h4>(.*?)</b><br>(.*?)<br/>'''
reg = re.compile(regexp, re.IGNORECASE)
m = reg.findall(cdata)
for match in m:
episode = "S%02dE%02d" % (int(match[0]), int(match[1]))
episodes[episode] = {}
episodes[episode]['imdb'] = match[2]
episodes[episode]['title'] = match[3].strip()
if episodes[episode]['title'].startswith('Episode #%d'%int(match[0])):
episodes[episode]['title'] = u''
description = htmldecode(match[5])
description = stripTags(description.split('Next US airings:')[0])
episodes[episode]['description'] = description
episodes[episode]['date'] = ''
d = stripTags(match[4])
d = d.replace('Original Air Date: ', '')
d = time.strftime("%Y-%m-%d", time.strptime(d, '%d %B %Y'))
episodes[episode]['date'] = d
import traceback
print traceback.print_exc()
self.episodes = episodes
return self.episodes
def getLocations(self, forcereload = False):
if forcereload or not self.locationSource:
self.keywordSource = getUrlUnicode(self.locationUrl)
return self.keywordSource
def parseLocations(self):
soup = BeautifulSoup(self.getLocations())
locations = []
for key in soup('a', {'href': re.compile('^/List')}):
self.locations = locations
return self.locations
def getKeywords(self, forcereload = False):
if forcereload or not self.keywordSource:
self.keywordSource = getUrlUnicode(self.keywordUrl)
return self.keywordSource
def parseKeywords(self):
soup = BeautifulSoup(self.getKeywords())
keywords = []
for key in soup('a', {'href': re.compile('^/keyword/')}):
k = htmldecode(key.string)
k = k.replace(u'\xa0', ' ')
self.keywords = keywords
return self.keywords
def getTrivia(self, forcereload = False):
if forcereload or not self.triviaSource:
self.triviaSource = getUrlUnicode(self.triviaUrl)
return self.triviaSource
def parseTrivia(self):
trivia = []
soup = BeautifulSoup(self.getTrivia())
triviaList = []
for i in soup('ul', {'class': "trivia"}):
for t in i('li'):
t = str(t).replace('<br />', '').strip()
if t.startswith('<li>') and t.endswith('</li>'):
t = t[4:-5].strip()
self.trivia = trivia
return self.trivia
def getConnections(self, forcereload = False):
if forcereload or not self.connectionsSource:
self.connectionsSource = getUrlUnicode(self.connectionsUrl)
return self.connectionsSource
def parseConnections(self):
connections = {}
soup = BeautifulSoup(self.getConnections())
content = soup('div', {'id': 'tn15content'})[0]
blocks = str(content).split('<h5>')[1:]
for c in blocks:
connection = c.split('</h5>')[0]
cs = BeautifulSoup(c)
if connection:
#relation -> list of imdb ids
connections[connection] = [a.get('href')[-8:-1] for a in cs('a', {'href': re.compile('/title/tt')})]
return connections
def getReleaseinfo(self, forcereload = False):
if forcereload or not self.releaseinfoSource:
self.releaseinfoSource = getUrlUnicode(self.releaseinfoUrl)
return self.releaseinfoSource
def parseReleaseinfo(self):
soup = BeautifulSoup(self.getReleaseinfo())
info = soup('table',{'border': '0', 'cellpadding':'2'})
if info:
for row in info[0]('tr'):
d = row('td', {'align':'right'})
if d:
possible_date = stripTags(str(d[0])).strip()
rdate = time.strptime(possible_date, "%d %B %Y")
rdate = time.strftime('%Y-%m-%d', rdate)
return rdate
return None
def getBusiness(self, forcereload = False):
if forcereload or not self.businessSource:
self.businessSource = getUrlUnicode(self.businessUrl)
return self.businessSource
def parseBusiness(self):
soup = BeautifulSoup(self.getBusiness())
business = {'budget': 0, 'gross': 0, 'profit': 0}
content = soup('div', {'id': 'tn15content'})[0]
blocks = str(content).split('<h5>')[1:]
for c in blocks:
cs = BeautifulSoup(c)
line = c.split('</h5>')
if line:
title = line[0]
line = line[1]
if title in ['Budget', 'Gross']:
values = re.compile('\$(.*?) ').findall(line)
values = [int(value.replace(',','')) for value in values]
if values:
business[title.lower()] = max(values)
if business['budget'] and business['gross']:
business['profit'] = business['gross'] - business['budget']
return business
def getExternalreviews(self, forcereload = False):
if forcereload or not self.externalreviewsSource:
self.externalreviewsSource = getUrlUnicode(self.externalreviewsUrl)
return self.externalreviewsSource
def parseExternalreviews(self):
soup = BeautifulSoup(self.getExternalreviews())
ol = soup('ol')
if ol:
ol = ol[0]
ret = {}
for li in ol('li'):
a = li('a')[0]
href = a.get('href')
txt = a.contents[0]
ret[href] = txt
return ret
return {}
def getTrailer(self, forcereload = False):
if forcereload or not self.trailerSource:
self.trailerSource = getUrlUnicode(self.trailerUrl)
return self.trailerSource
def parseTrailer(self):
ret = {}
soup = BeautifulSoup(self.getTrailer())
for p in soup('p'):
if p('a') and p.firstText():
a = p('a')[0]
href = a['href']
if href and href.startswith('http'):
title = a.string
title = title.replace('www.', '')
ret[href] = title
return ret
def guess(title, director=''):
#FIXME: proper file -> title
title = title.split('-')[0]
title = title.split('(')[0]
title = title.split('.')[0]
title = title.strip()
imdb_url = 'http://www.imdb.com/find?q=%s' % quote(title.encode('utf-8'))
return_url = ''
#lest first try google
#i.e. site:imdb.com Michael Stevens Sin
if director:
search = 'site:imdb.com %s "%s"' % (director, title)
search = 'site:imdb.com "%s"' % title
for (name, url, desc) in google.find(search, 2):
if url.startswith('http://www.imdb.com/title/tt'):
return url[28:35]
req = urllib2.Request(imdb_url, None, oxutils.net.DEFAULT_HEADERS)
u = urllib2.urlopen(req)
data = u.read()
return_url = u.url
return None
if return_url.startswith('http://www.imdb.com/title/tt'):
return return_url[28:35]
if data:
imdb_id = _getTerm(data.replace('\n', ' '), 'Popular Results.*?<ol><li>.*?<a href="/title/tt(.......)')
if imdb_id:
return imdb_id
imdb_url = 'http://www.imdb.com/find?q=%s;s=tt;site=aka' % quote(title.encode('utf-8'))
req = urllib2.Request(imdb_url, None, oxutils.net.DEFAULT_HEADERS)
u = urllib2.urlopen(req)
data = u.read()
return_url = u.url
if return_url.startswith('http://www.imdb.com/title/tt'):
return return_url[28:35]
return None
def getEpisodeData(title, episode, show_url = None):
Collect information about an episode.
Returns dict with title, show, description and episode
episodeData = {
'title': u'',
'show': title,
'description': u'',
'episode': episode,
description = u''
if not show_url:
imdbid = guess(title)
imdbid = "%07d" % int(re.compile('title/tt(\d*)').findall(link)[0])
if imdbid:
i = IMDb(imdbid).parse()
episodeData['title'] = i['episodes'][episode]['title']
episodeData['description'] = i['episodes'][episode]['description']
episodeData['imdb'] = i['episodes'][episode]['imdb']
return episodeData
def getMovieStills(id):
data = getUrl("http://imdb.com/gallery/ss/%s" % id)
s_ = re.compile('''<img width="(\d*?)" height="(\d*?)" src="http://i.imdb.com/Photos/Ss/%s/th-(.*?).jpg"''' % id).findall(data)
stills = []
for s in s_:
if int(s[0]) > int(s[1]):
stills.append("http://i.imdb.com/Photos/Ss/%s/%s.jpg" % (id, s[2]))
if not stills:
s_ = re.compile('''<img width="(\d*?)" height="(\d*?)" src="http://(.*?)p.jpg"''').findall(data)
stills = []
for s in s_:
if int(s[0]) > int(s[1]):
stills.append("http://%sf.jpg" % s[2])
return stills
if __name__ == '__main__':
import sys
#print parse(sys.argv[1])
print "imdb:", guess(sys.argv[1])
#!/usr/bin/env python
# vi:si:et:sw=2:sts=2:ts=2
# encoding: utf-8
from setuptools import setup, find_packages
import os
# uncomment the following lines if you fill them out in release.py
description="collection of scrapers for various websites",
keywords = [
classifiers = [
'Development Status :: 3 - Alpha',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Topic :: Software Development :: Libraries :: Python Modules',
