diff --git a/ox/cache.py b/ox/cache.py index e47e515..9ffb0fe 100644 --- a/ox/cache.py +++ b/ox/cache.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- # vi:si:et:sw=4:sts=4:ts=4 -# GPL 2008 +# GPL 2011 +from __future__ import with_statement + import gzip import zlib import hashlib @@ -12,12 +14,12 @@ import urllib2 import sqlite3 import chardet -from ox.utils import json +from utils import json +from .file import makedirs import net from net import DEFAULT_HEADERS, getEncoding - cache_timeout = 30*24*60*60 # default is 30 days COMPRESS_TYPES = ( @@ -54,12 +56,10 @@ def exists(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout): return False def getHeaders(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout): - url_headers = _readUrlCache(url, data, headers, timeout, "headers") - if url_headers: - url_headers = json.loads(url_headers) - else: + url_headers = store.get(url, data, headers, timeout, "headers") + if not url_headers: url_headers = net.getHeaders(url, data, headers) - _saveUrlCache(url, data, -1, url_headers) + store.set(url, data, -1, url_headers) return url_headers class InvalidResult(Exception): @@ -80,7 +80,7 @@ def readUrl(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout, vali #FIXME: send last-modified / etag from cache and only update if needed if isinstance(url, unicode): url = url.encode('utf-8') - result = _readUrlCache(url, data, headers, timeout) + result = store.get(url, data, headers, timeout) if not result: #print "get data", url try: @@ -92,7 +92,7 @@ def readUrl(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout, vali if url_headers.get('content-encoding', None) == 'gzip': result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read() if not valid or valid(result, url_headers): - _saveUrlCache(url, data, result, url_headers) + store.set(url, data, result, url_headers) else: raise InvalidResult(result, url_headers) return result @@ -114,172 +114,203 @@ def saveUrl(url, filename, overwrite=False): f.write(data) f.close() -def _getCacheBase(): - 'cache base is eather ~/.ox/cache or can set via env variable oxCACHE' +def cache_path(): return os.environ.get('oxCACHE', os.path.expanduser('~/.ox/cache')) -def _getCacheDB(): - path = _getCacheBase() - if not os.path.exists(path): - os.makedirs(path) - return os.path.join(path, "cache.sqlite") +class Cache: + def __init__(self): + pass -def _connectDb(): - conn = sqlite3.connect(_getCacheDB(), timeout=10) - conn.text_factory = str - return conn + def get(self, url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"): + ''' + if value == 'data' return data of url if its in the cache else None + if value == 'headers' return headers for url + ''' + pass -def _getSetting(c, key, default=None): - c.execute('SELECT value FROM setting WHERE key = ?', (key, )) - for row in c: - return row[0] - return default + def set(self, url, post_data, data, headers): + pass -def _setSetting(c, key, value): - c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value))) +class SQLiteCache(Cache): + def __init__(self): + path = cache_path() + if not os.path.exists(path): + os.makedirs(path) + self.db = os.path.join(path, "cache.sqlite") -def _createDb(conn, c): - # Create table and indexes - c.execute('''CREATE TABLE IF NOT EXISTS cache (url_hash varchar(42) unique, domain text, url text, - post_data text, headers text, created int, data blob, only_headers int)''') - c.execute('''CREATE INDEX IF NOT EXISTS cache_domain ON cache (domain)''') - c.execute('''CREATE INDEX IF NOT EXISTS cache_url ON cache (url)''') - c.execute('''CREATE INDEX IF NOT EXISTS cache_url_hash ON cache (url_hash)''') + def connect(self): + self.conn = sqlite3.connect(self.db, timeout=10) + self.conn.text_factory = str + self.create() - c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') - if int(_getSetting(c, 'version', 0)) < 1: - _setSetting(c, 'version', 1) - c.execute('''ALTER TABLE cache ADD compressed INT DEFAULT 0''') - conn.commit() + def create(self): + c = self.conn.cursor() + # Create table and indexes + c.execute('''CREATE TABLE IF NOT EXISTS cache (url_hash varchar(42) unique, domain text, url text, + post_data text, headers text, created int, data blob, only_headers int)''') + c.execute('''CREATE INDEX IF NOT EXISTS cache_domain ON cache (domain)''') + c.execute('''CREATE INDEX IF NOT EXISTS cache_url ON cache (url)''') + c.execute('''CREATE INDEX IF NOT EXISTS cache_url_hash ON cache (url_hash)''') -def _readUrlCache(url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"): - r = None - if timeout == 0: + c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') + if int(self.get_setting(c, 'version', 0)) < 1: + self.set_setting(c, 'version', 1) + c.execute('''ALTER TABLE cache ADD compressed INT DEFAULT 0''') + self.conn.commit() + + def get_setting(self, c, key, default=None): + c.execute('SELECT value FROM setting WHERE key = ?', (key, )) + for row in c: + return row[0] + return default + + def set_setting(self, c, key, value): + c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value))) + + def get(self, url, data={}, headers=DEFAULT_HEADERS, timeout=-1, value="data"): + r = None + if timeout == 0: + return r + + if data: + url_hash = hashlib.sha1(url + '?' + data).hexdigest() + else: + url_hash = hashlib.sha1(url).hexdigest() + + self.connect() + c = self.conn.cursor() + sql = 'SELECT %s, compressed FROM cache WHERE url_hash=?' % value + if timeout > 0: + now = time.mktime(time.localtime()) + t = (url_hash, now-timeout) + sql += ' AND created > ?' + else: + t = (url_hash, ) + if value != "headers": + sql += ' AND only_headers != 1 ' + c.execute(sql, t) + for row in c: + r = row[0] + if value == 'headers': + r = json.loads(r) + elif value == 'data': + if row[1] == 1: + r = zlib.decompress(r) + else: + r = str(r) + break + + c.close() + self.conn.close() return r - if data: - url_hash = hashlib.sha1(url + '?' + data).hexdigest() - else: - url_hash = hashlib.sha1(url).hexdigest() + def set(self, url, post_data, data, headers): + if post_data: + url_hash = hashlib.sha1(url + '?' + post_data).hexdigest() + else: + url_hash = hashlib.sha1(url).hexdigest() - conn = _connectDb() - c = conn.cursor() - _createDb(conn, c) + domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:]) - sql = 'SELECT %s, compressed FROM cache WHERE url_hash=?' % value - if timeout > 0: - now = time.mktime(time.localtime()) - t = (url_hash, now-timeout) - sql += ' AND created > ?' - else: - t = (url_hash, ) - if value != "headers": - sql += ' AND only_headers != 1 ' - c.execute(sql, t) - for row in c: - r = row[0] - if value == 'data': - if row[1] == 1: - r = zlib.decompress(r) - else: - r = str(r) - break + self.connect() + c = self.conn.cursor() - c.close() - conn.close() - return r - -def _saveUrlCache(url, post_data, data, headers): - if post_data: - url_hash = hashlib.sha1(url + '?' + post_data).hexdigest() - else: - url_hash = hashlib.sha1(url).hexdigest() - - domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:]) - - conn = _connectDb() - c = conn.cursor() - - # Create table if not exists - _createDb(conn, c) - - # Insert a row of data - if not post_data: post_data="" - only_headers = 0 - if data == -1: - only_headers = 1 - data = "" - created = time.mktime(time.localtime()) - content_type = headers.get('content-type', '').split(';')[0].strip() - if content_type in COMPRESS_TYPES: - compressed = 1 - data = zlib.compress(data) - else: - compressed = 0 - data = sqlite3.Binary(data) - t = (url_hash, domain, url, post_data, json.dumps(headers), created, data, only_headers, compressed) - c.execute(u"""INSERT OR REPLACE INTO cache values (?, ?, ?, ?, ?, ?, ?, ?, ?)""", t) - - # Save (commit) the changes and clean up - conn.commit() - c.close() - conn.close() - -def migrate_to_db(): - import re - import os - import sqlite3 - import glob - - conn = _connectDb() - c = conn.cursor() - _createDb(conn, c) - - files = glob.glob(_getCacheBase() + "/*/*/*/*/*") - _files = filter(lambda x: not x.endswith(".headers"), files) - - for f in _files: - info = re.compile("%s/(.*?)/../../../(.*)" % _getCacheBase()).findall(f) - domain = url = info[0][0] - url_hash = info[0][1] - post_data = "" - created = os.stat(f).st_ctime - fd = open(f, "r") - data = fd.read() - fd.close() - fd = open(f + ".headers", "r") - headers = fd.read() - fd.close() - t = (url_hash, domain, url, post_data, headers, created, sqlite3.Binary(data), 0) - c.execute(u"""INSERT OR REPLACE INTO cache values (?, ?, ?, ?, ?, ?, ?, ?)""", t) - - conn.commit() - c.close() - conn.close() - -def compress_db(): - conn = _connectDb() - c = conn.cursor() - _createDb(conn, c) - c.execute(u"""SELECT url_hash FROM cache WHERE compressed = 0""") - ids = [row[0] for row in c] - for url_hash in ids: - c.execute(u"""SELECT headers, data FROM cache WHERE url_hash = ?""", (url_hash, )) - headers = {} - for row in c: - headers = json.loads(row[0]) - data = row[1] - + # Insert a row of data + if not post_data: post_data="" + only_headers = 0 + if data == -1: + only_headers = 1 + data = "" + created = time.mktime(time.localtime()) content_type = headers.get('content-type', '').split(';')[0].strip() if content_type in COMPRESS_TYPES: + compressed = 1 data = zlib.compress(data) - t = (sqlite3.Binary(data), url_hash) - print url_hash, 'update' - c.execute('UPDATE cache SET compressed = 1, data = ? WHERE url_hash = ?', t) + else: + compressed = 0 + data = sqlite3.Binary(data) + t = (url_hash, domain, url, post_data, json.dumps(headers), created, + data, only_headers, compressed) + c.execute(u"""INSERT OR REPLACE INTO cache values (?, ?, ?, ?, ?, ?, ?, ?, ?)""", t) + + # Save (commit) the changes and clean up + self.conn.commit() + c.close() + self.conn.close() + +class FileCache(Cache): + def __init__(self): + f, self.root = cache_path().split(':') + + def files(self, domain, h): + prefix = os.path.join(self.root, domain, h[:2], h[2:4], h[4:6], h[6:8]) + i = os.path.join(prefix, '%s.json'%h) + f = os.path.join(prefix, '%s.dat'%h) + return prefix, i, f + + def get(self, url, data={}, headers=DEFAULT_HEADERS, timeout=-1, value="data"): + r = None + if timeout == 0: + return r + + if data: + url_hash = hashlib.sha1(url + '?' + data).hexdigest() + else: + url_hash = hashlib.sha1(url).hexdigest() + + domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:]) + prefix, i, f = self.files(domain, url_hash) + + if os.path.exists(i): + with open(i) as _i: + info = json.load(_i) + + now = time.mktime(time.localtime()) + expired = now-timeout + + if value != 'headers' and info['only_headers']: + return None + if timeout < 0 or info['created'] > expired: + if value == 'headers': + r = info['headers'] + else: + with open(f) as data: + r = data.read() + if info['compressed']: + r = zlib.decompress(r) + return r + + def set(self, url, post_data, data, headers): + if post_data: + url_hash = hashlib.sha1(url + '?' + post_data).hexdigest() + else: + url_hash = hashlib.sha1(url).hexdigest() + + domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:]) + prefix, i, f = self.files(domain, url_hash) + makedirs(prefix) + + created = time.mktime(time.localtime()) + content_type = headers.get('content-type', '').split(';')[0].strip() + + info = { + 'compressed': content_type in COMPRESS_TYPES, + 'only_headers': data == -1, + 'created': created, + 'headers': headers, + } + if post_data: + info['post_data'] = post_data + if not info['only_headers']: + if info['compressed']: + data = zlib.compress(data) + with open(f, 'w') as _f: + _f.write(data) + with open(i, 'w') as _i: + json.dump(info, _i) + +if cache_path().startswith('fs:'): + store = FileCache() +else: + store = SQLiteCache() - conn.commit() - print "optimizing database" - c.execute('VACUUM') - conn.commit() - c.close() - conn.close()