add file cache

This commit is contained in:
j 2011-11-01 13:55:49 +01:00
parent 2a7b70c576
commit 8fe8822e09

View file

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4 # vi:si:et:sw=4:sts=4:ts=4
# GPL 2008 # GPL 2011
from __future__ import with_statement
import gzip import gzip
import zlib import zlib
import hashlib import hashlib
@ -12,12 +14,12 @@ import urllib2
import sqlite3 import sqlite3
import chardet import chardet
from ox.utils import json from utils import json
from .file import makedirs
import net import net
from net import DEFAULT_HEADERS, getEncoding from net import DEFAULT_HEADERS, getEncoding
cache_timeout = 30*24*60*60 # default is 30 days cache_timeout = 30*24*60*60 # default is 30 days
COMPRESS_TYPES = ( COMPRESS_TYPES = (
@ -54,12 +56,10 @@ def exists(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout):
return False return False
def getHeaders(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout): def getHeaders(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout):
url_headers = _readUrlCache(url, data, headers, timeout, "headers") url_headers = store.get(url, data, headers, timeout, "headers")
if url_headers: if not url_headers:
url_headers = json.loads(url_headers)
else:
url_headers = net.getHeaders(url, data, headers) url_headers = net.getHeaders(url, data, headers)
_saveUrlCache(url, data, -1, url_headers) store.set(url, data, -1, url_headers)
return url_headers return url_headers
class InvalidResult(Exception): class InvalidResult(Exception):
@ -80,7 +80,7 @@ def readUrl(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout, vali
#FIXME: send last-modified / etag from cache and only update if needed #FIXME: send last-modified / etag from cache and only update if needed
if isinstance(url, unicode): if isinstance(url, unicode):
url = url.encode('utf-8') url = url.encode('utf-8')
result = _readUrlCache(url, data, headers, timeout) result = store.get(url, data, headers, timeout)
if not result: if not result:
#print "get data", url #print "get data", url
try: try:
@ -92,7 +92,7 @@ def readUrl(url, data=None, headers=DEFAULT_HEADERS, timeout=cache_timeout, vali
if url_headers.get('content-encoding', None) == 'gzip': if url_headers.get('content-encoding', None) == 'gzip':
result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read() result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read()
if not valid or valid(result, url_headers): if not valid or valid(result, url_headers):
_saveUrlCache(url, data, result, url_headers) store.set(url, data, result, url_headers)
else: else:
raise InvalidResult(result, url_headers) raise InvalidResult(result, url_headers)
return result return result
@ -114,31 +114,37 @@ def saveUrl(url, filename, overwrite=False):
f.write(data) f.write(data)
f.close() f.close()
def _getCacheBase(): def cache_path():
'cache base is eather ~/.ox/cache or can set via env variable oxCACHE'
return os.environ.get('oxCACHE', os.path.expanduser('~/.ox/cache')) return os.environ.get('oxCACHE', os.path.expanduser('~/.ox/cache'))
def _getCacheDB(): class Cache:
path = _getCacheBase() def __init__(self):
pass
def get(self, url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"):
'''
if value == 'data' return data of url if its in the cache else None
if value == 'headers' return headers for url
'''
pass
def set(self, url, post_data, data, headers):
pass
class SQLiteCache(Cache):
def __init__(self):
path = cache_path()
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path) os.makedirs(path)
return os.path.join(path, "cache.sqlite") self.db = os.path.join(path, "cache.sqlite")
def _connectDb(): def connect(self):
conn = sqlite3.connect(_getCacheDB(), timeout=10) self.conn = sqlite3.connect(self.db, timeout=10)
conn.text_factory = str self.conn.text_factory = str
return conn self.create()
def _getSetting(c, key, default=None): def create(self):
c.execute('SELECT value FROM setting WHERE key = ?', (key, )) c = self.conn.cursor()
for row in c:
return row[0]
return default
def _setSetting(c, key, value):
c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value)))
def _createDb(conn, c):
# Create table and indexes # Create table and indexes
c.execute('''CREATE TABLE IF NOT EXISTS cache (url_hash varchar(42) unique, domain text, url text, c.execute('''CREATE TABLE IF NOT EXISTS cache (url_hash varchar(42) unique, domain text, url text,
post_data text, headers text, created int, data blob, only_headers int)''') post_data text, headers text, created int, data blob, only_headers int)''')
@ -147,12 +153,21 @@ def _createDb(conn, c):
c.execute('''CREATE INDEX IF NOT EXISTS cache_url_hash ON cache (url_hash)''') c.execute('''CREATE INDEX IF NOT EXISTS cache_url_hash ON cache (url_hash)''')
c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''')
if int(_getSetting(c, 'version', 0)) < 1: if int(self.get_setting(c, 'version', 0)) < 1:
_setSetting(c, 'version', 1) self.set_setting(c, 'version', 1)
c.execute('''ALTER TABLE cache ADD compressed INT DEFAULT 0''') c.execute('''ALTER TABLE cache ADD compressed INT DEFAULT 0''')
conn.commit() self.conn.commit()
def _readUrlCache(url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"): def get_setting(self, c, key, default=None):
c.execute('SELECT value FROM setting WHERE key = ?', (key, ))
for row in c:
return row[0]
return default
def set_setting(self, c, key, value):
c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value)))
def get(self, url, data={}, headers=DEFAULT_HEADERS, timeout=-1, value="data"):
r = None r = None
if timeout == 0: if timeout == 0:
return r return r
@ -162,10 +177,8 @@ def _readUrlCache(url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"):
else: else:
url_hash = hashlib.sha1(url).hexdigest() url_hash = hashlib.sha1(url).hexdigest()
conn = _connectDb() self.connect()
c = conn.cursor() c = self.conn.cursor()
_createDb(conn, c)
sql = 'SELECT %s, compressed FROM cache WHERE url_hash=?' % value sql = 'SELECT %s, compressed FROM cache WHERE url_hash=?' % value
if timeout > 0: if timeout > 0:
now = time.mktime(time.localtime()) now = time.mktime(time.localtime())
@ -178,7 +191,9 @@ def _readUrlCache(url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"):
c.execute(sql, t) c.execute(sql, t)
for row in c: for row in c:
r = row[0] r = row[0]
if value == 'data': if value == 'headers':
r = json.loads(r)
elif value == 'data':
if row[1] == 1: if row[1] == 1:
r = zlib.decompress(r) r = zlib.decompress(r)
else: else:
@ -186,10 +201,10 @@ def _readUrlCache(url, data, headers=DEFAULT_HEADERS, timeout=-1, value="data"):
break break
c.close() c.close()
conn.close() self.conn.close()
return r return r
def _saveUrlCache(url, post_data, data, headers): def set(self, url, post_data, data, headers):
if post_data: if post_data:
url_hash = hashlib.sha1(url + '?' + post_data).hexdigest() url_hash = hashlib.sha1(url + '?' + post_data).hexdigest()
else: else:
@ -197,11 +212,8 @@ def _saveUrlCache(url, post_data, data, headers):
domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:]) domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:])
conn = _connectDb() self.connect()
c = conn.cursor() c = self.conn.cursor()
# Create table if not exists
_createDb(conn, c)
# Insert a row of data # Insert a row of data
if not post_data: post_data="" if not post_data: post_data=""
@ -217,69 +229,88 @@ def _saveUrlCache(url, post_data, data, headers):
else: else:
compressed = 0 compressed = 0
data = sqlite3.Binary(data) data = sqlite3.Binary(data)
t = (url_hash, domain, url, post_data, json.dumps(headers), created, data, only_headers, compressed) t = (url_hash, domain, url, post_data, json.dumps(headers), created,
data, only_headers, compressed)
c.execute(u"""INSERT OR REPLACE INTO cache values (?, ?, ?, ?, ?, ?, ?, ?, ?)""", t) c.execute(u"""INSERT OR REPLACE INTO cache values (?, ?, ?, ?, ?, ?, ?, ?, ?)""", t)
# Save (commit) the changes and clean up # Save (commit) the changes and clean up
conn.commit() self.conn.commit()
c.close() c.close()
conn.close() self.conn.close()
def migrate_to_db(): class FileCache(Cache):
import re def __init__(self):
import os f, self.root = cache_path().split(':')
import sqlite3
import glob
conn = _connectDb() def files(self, domain, h):
c = conn.cursor() prefix = os.path.join(self.root, domain, h[:2], h[2:4], h[4:6], h[6:8])
_createDb(conn, c) i = os.path.join(prefix, '%s.json'%h)
f = os.path.join(prefix, '%s.dat'%h)
return prefix, i, f
files = glob.glob(_getCacheBase() + "/*/*/*/*/*") def get(self, url, data={}, headers=DEFAULT_HEADERS, timeout=-1, value="data"):
_files = filter(lambda x: not x.endswith(".headers"), files) r = None
if timeout == 0:
return r
for f in _files: if data:
info = re.compile("%s/(.*?)/../../../(.*)" % _getCacheBase()).findall(f) url_hash = hashlib.sha1(url + '?' + data).hexdigest()
domain = url = info[0][0] else:
url_hash = info[0][1] url_hash = hashlib.sha1(url).hexdigest()
post_data = ""
created = os.stat(f).st_ctime
fd = open(f, "r")
data = fd.read()
fd.close()
fd = open(f + ".headers", "r")
headers = fd.read()
fd.close()
t = (url_hash, domain, url, post_data, headers, created, sqlite3.Binary(data), 0)
c.execute(u"""INSERT OR REPLACE INTO cache values (?, ?, ?, ?, ?, ?, ?, ?)""", t)
conn.commit() domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:])
c.close() prefix, i, f = self.files(domain, url_hash)
conn.close()
def compress_db(): if os.path.exists(i):
conn = _connectDb() with open(i) as _i:
c = conn.cursor() info = json.load(_i)
_createDb(conn, c)
c.execute(u"""SELECT url_hash FROM cache WHERE compressed = 0""")
ids = [row[0] for row in c]
for url_hash in ids:
c.execute(u"""SELECT headers, data FROM cache WHERE url_hash = ?""", (url_hash, ))
headers = {}
for row in c:
headers = json.loads(row[0])
data = row[1]
now = time.mktime(time.localtime())
expired = now-timeout
if value != 'headers' and info['only_headers']:
return None
if timeout < 0 or info['created'] > expired:
if value == 'headers':
r = info['headers']
else:
with open(f) as data:
r = data.read()
if info['compressed']:
r = zlib.decompress(r)
return r
def set(self, url, post_data, data, headers):
if post_data:
url_hash = hashlib.sha1(url + '?' + post_data).hexdigest()
else:
url_hash = hashlib.sha1(url).hexdigest()
domain = ".".join(urlparse.urlparse(url)[1].split('.')[-2:])
prefix, i, f = self.files(domain, url_hash)
makedirs(prefix)
created = time.mktime(time.localtime())
content_type = headers.get('content-type', '').split(';')[0].strip() content_type = headers.get('content-type', '').split(';')[0].strip()
if content_type in COMPRESS_TYPES:
data = zlib.compress(data)
t = (sqlite3.Binary(data), url_hash)
print url_hash, 'update'
c.execute('UPDATE cache SET compressed = 1, data = ? WHERE url_hash = ?', t)
conn.commit() info = {
print "optimizing database" 'compressed': content_type in COMPRESS_TYPES,
c.execute('VACUUM') 'only_headers': data == -1,
conn.commit() 'created': created,
c.close() 'headers': headers,
conn.close() }
if post_data:
info['post_data'] = post_data
if not info['only_headers']:
if info['compressed']:
data = zlib.compress(data)
with open(f, 'w') as _f:
_f.write(data)
with open(i, 'w') as _i:
json.dump(info, _i)
if cache_path().startswith('fs:'):
store = FileCache()
else:
store = SQLiteCache()