distributed_scraper/server.py

222 lines
7.0 KiB
Python
Executable File

#!/usr/bin/python
# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import os
import json
import shutil
import time
import thread
from Queue import Queue
from threading import Thread
import sqlite3
import hashlib
import ox
from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
from twisted.internet import reactor
LIMIT = {
'maps.google.com': 100,
'google.com': 100,
}
class ProcessThread(Thread):
def __init__(self, server):
Thread.__init__(self)
self.server = server
def process(self, type, url, path):
print 'FIXME', 'do something here'
print type, url
print path
return True
def run(self):
while True:
type, url, path = self.server.queue.get()
if self.process(type, url, path):
os.unlink(path)
self.server.db.done(url, type)
PENDING = 0
ACTIVE = 1
DONE = 2
FAILED = -1
class DB(object):
def __init__(self, db):
self._db = db
conn, c = self._conn()
c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''')
if int(self.get('version', 0)) < 1:
self.set('version', 1)
db = [
'''CREATE TABLE IF NOT EXISTS url (
urlhash varchar(42),
url text,
type varchar(255),
status INT,
started INT,
client varchar(255))''',
'''CREATE INDEX IF NOT EXISTS urlhash_idx ON url (urlhash)''',
'''CREATE INDEX IF NOT EXISTS status_idx ON url (status, started)''',
'''CREATE INDEX IF NOT EXISTS client_idx ON url (client)''',
]
for i in db:
c.execute(i)
conn.commit()
def _conn(self):
db_conn = os.path.expanduser(self._db)
if not os.path.exists(os.path.dirname(db_conn)):
os.makedirs(os.path.dirname(db_conn))
conn = sqlite3.connect(db_conn, timeout=10)
conn.text_factory = sqlite3.OptimizedUnicode
return conn, conn.cursor()
def get(self, key, default=None):
conn, c = self._conn()
c.execute('SELECT value FROM setting WHERE key = ?', (key, ))
for row in c:
return row[0]
return default
def set(self, key, value):
conn, c = self._conn()
c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value)))
conn.commit()
def add_url(self, url, type):
conn, c = self._conn()
status = PENDING
started = 0
urlhash = hashlib.sha1(url).hexdigest()
c.execute(u'INSERT OR REPLACE INTO url values (?, ?, ?, ?, ?, ?)', (urlhash, url, type, status, started, ''))
conn.commit()
def get_url(self, client, type):
conn, c = self._conn()
now = time.mktime(time.localtime())
timeout = 60*60 #one hour
c.execute('UPDATE url SET status = ? WHERE status = ? AND started < ?', (PENDING, ACTIVE, now-timeout))
conn.commit()
c.execute('SELECT urlhash, url FROM url WHERE type = ? AND status = ?', (type, PENDING))
for row in c:
print 'got url'
urlhash = row[0]
url = row[1]
c.execute(u'UPDATE url SET status = ?, started = ?, client = ? WHERE urlhash = ? AND type = ?', (ACTIVE, now, client, urlhash, type))
conn.commit()
return url
def done(self, url, type):
conn, c = self._conn()
urlhash = hashlib.sha1(url).hexdigest()
c.execute(u'UPDATE url SET status = ? WHERE urlhash = ? AND type = ?', (DONE, urlhash, type))
conn.commit()
def client_limit(self, client, type):
conn, c = self._conn()
limit = time.mktime(time.localtime()) - 60*60
c.execute('SELECT COUNT(*) FROM url WHERE type = ? AND status = ? AND client = ? AND started > ?', (type, DONE, client, limit))
for row in c:
return int(row[0])
return 0
class Server(Resource):
urls = {}
clients = {}
def __init__(self, db):
self.db = DB(db)
self.queue = Queue()
Resource.__init__(self)
t = ProcessThread(self)
t.setDaemon(True)
t.start()
def render_json(self, request, response):
request.headers['Content-Type'] = 'application/json'
return json.dumps(response, indent=2)
def getChild(self, name, request):
'''
#make source media available via oshash
if request.path.startswith('/get/'):
oshash = request.path.split('/')[-1]
for path in self.client.path(oshash):
if os.path.exists(path):
f = File(path, 'application/octet-stream')
f.isLeaf = True
return f
'''
return self
def render_PUT(self, request):
if request.path.startswith('/save'):
url = request.args['url'][0]
type = request.args['type'][0]
path = '/tmp/%s.data' % hashlib.sha1(url).hexdigest()
with open(path, 'wb') as f:
shutil.copyfileobj(request.content, f)
#put in queue so it can be picked up by process thread
self.queue.put((type, url, path))
return self.render_json(request, {
'status': 'ok'
})
request.setResponseCode(404)
return '404 unkown location'
def render_POST(self, request):
if request.path.startswith('/add'):
'''
call with url, type, limit
'''
type = request.args['type'][0]
url = request.args['url'][0]
self.db.add_url(url, type)
return self.render_json(request, {})
elif request.path.startswith('/get'):
'''
call with type, client
'''
client = request.args['client'][0]
type = request.args['type'][0]
if type not in LIMIT or self.db.client_limit(client, type) < LIMIT[type]:
url = self.db.get_url(client, type)
if url:
return self.render_json(request, {
'url': url
})
return self.render_json(request, {})
request.setResponseCode(404)
return '404 unkown location'
def render_GET(self, request):
if request.path.startswith('/status'):
#fixme list queued urls and stuff
return self.render_json(request, {})
request.headers['Content-Type'] = 'text/html'
data = 'ox distributed read_url'
return data
def main(db):
root = Server(db)
site = Site(root)
port = 2623
interface = '0.0.0.0'
reactor.listenTCP(port, site, interface=interface)
print 'listening on http://%s:%s' % (interface, port)
reactor.run()
if __name__ == '__main__':
import sys
db = os.path.abspath(sys.argv[1])
main(db)