cinematools/films_by_country.py
2025-01-29 21:59:24 +05:30

133 lines
4 KiB
Python
Executable file

#!/usr/bin/python3
from datetime import datetime, timedelta
from optparse import OptionParser
import json
import re
import sys
from ox.web.imdb import cache, read_url
import ox.geo
QUERY = '''
query advancedSearch{
advancedTitleSearch(
first: 1000, sort: {sortBy: RELEASE_DATE, sortOrder: ASC}
constraints: {
releaseDateConstraint: {releaseDateRange: {start: "%s" end: "%s"}}
originCountryConstraint: {anyCountries: ["%s"]}
titleTypeConstraint: {anyTitleTypeIds: ["movie", "video", "tvMovie", "short"]}
}
) {
edges {
node{
title {
id
originalTitleText {
text
}
titleText {
text
}
titleType {
text
}
releaseYear {
year
endYear
}
countriesOfOrigin {
countries {
id
}
}
}
}
}
}
}
'''
url = 'https://caching.graphql.imdb.com/'
headers = cache.DEFAULT_HEADERS.copy()
headers.update({
'Accept': 'application/graphql+json, application/json',
'Origin': 'https://www.imdb.com',
'Referer': 'https://www.imdb.com',
'x-imdb-user-country': 'US',
'x-imdb-user-language': 'en-US',
'content-type': 'application/json',
'Accept-Language': 'en,en-US;q=0.5'
})
def get_year(year, country):
items = []
start = datetime(year, 1, 1)
while start.year == year:
query = QUERY % (start.strftime('%Y-%m-%d'), start.strftime('%Y-%m-%d'), country.upper())
response = json.loads(read_url(url, data=json.dumps({
"query": query
}), headers=headers))
edges = response['data']['advancedTitleSearch']['edges']
base_query = query
if len(response['data']['advancedTitleSearch']['edges']) == 1000:
query = query.replace('sortOrder: ASC', 'sortOrder: DESC')
response = json.loads(read_url(url + '?' + params, data=json.dumps({
"query": query
}), headers=headers))
print(response)
existing = [n["node"]["title"]["id"] for n in edges]
for edge in response['data']['advancedTitleSearch']['edges']:
if edge["node"]["title"]["id"] not in existing:
edges.append(edge)
print(start.date(), len(edges))
for row in edges:
title = row["node"]['title']
if title and title.get('countriesOfOrigin') and \
title.get('countriesOfOrigin', {}).get('countries'):
countries = [c['id'].upper() for c in title['countriesOfOrigin']['countries']]
else:
print("WTF", row)
countries = []
if country.upper() in countries:
items.append({
"imdbId": title["id"][2:],
"title": title["titleText"]["text"],
"type": title["titleType"]["text"],
"country": [ox.geo.get_country_name(c) for c in countries],
"year": year
})
start = start + timedelta(days=1)
items = {item["imdbId"]: item for item in items}
return list(items.values())
if __name__ == '__main__':
usage = "usage: %prog [options] countrycode output.json"
parser = OptionParser(usage=usage)
parser.add_option('-y', '--year', dest='year', default=1880, help="start from year")
parser.add_option('-e', '--end', dest='end', default=None, help="end at year")
(opts, args) = parser.parse_args()
if len(args) != 2:
parser.print_help()
sys.exit(1)
films = {}
country, filename = args
country = country.upper()
year = int(opts.year)
end_year = datetime.now().year
if opts.end:
end_year = int(opts.end) + 1
films = []
for year in range(year, end_year):
print('<<', year)
more = get_year(year, country)
print('>>', year, len(more))
films += more
with open(filename, "w") as fd:
json.dump(films, fd, indent=1, ensure_ascii=False)