commit 79b265dd613db708776baa94f2b8098b84a6210b Author: j <0x006A@0x2620.org> Date: Wed Jun 5 12:26:29 2013 +0200 mini distributed scraper diff --git a/README b/README new file mode 100644 index 0000000..b90a41b --- /dev/null +++ b/README @@ -0,0 +1,19 @@ +distributed scraper consists of 3 parts +server.py, client.py, add_url.py + +run server, pass path to database as first arguemnt: + + python server.py urls.sqlite + +run client, you have to pass server url, client name and job type: + + python server.py http://127.0.01:2623 client01 maps.google.com + +now you can add urls by running + python add_url.py http://127.0.01:2623 maps.google.com "http://mt1.google.com/vt/lyrs=h@218000000&hl=en&src=app&x=3&y=6&z=4&s=Galileo" + +REQUIREMENTS: + you need: + apt-get install python-ox, python-requests and python-twisted + or: + sudo easy_install ox requests twisted diff --git a/add_url.py b/add_url.py new file mode 100755 index 0000000..fbcbbbc --- /dev/null +++ b/add_url.py @@ -0,0 +1,17 @@ +#!/usr/bin/python +# encoding: utf-8 +# vi:si:et:sw=4:sts=4:ts=4 +import sys + +import requests + +if __name__ == '__main__': + server = sys.argv[1] + type = sys.argv[2] + url = sys.argv[3] + add_url = '%s/add' % server + r = requests.post(add_url, { + 'url': url, + 'type': type + }) + print r diff --git a/client.py b/client.py new file mode 100755 index 0000000..c065f57 --- /dev/null +++ b/client.py @@ -0,0 +1,53 @@ +#!/usr/bin/python +# encoding: utf-8 +# vi:si:et:sw=4:sts=4:ts=4 +import ox +import requests +import time +import json +from urllib import quote + +class Client: + + def __init__(self, url, name, type): + self.url = url + self.name = name + self.type = type + + def next(self): + url = '%s/get' % self.url + r = requests.post(url, { + 'type': self.type, + 'client': self.name + }) + data = json.loads(r.content) + if 'url' in data: + print data['url'] + #result = ox.net.read_url(data['url']) + result = ox.cache.read_url(data['url']) + put_url = '%s/save?url=%s&client=%s&type=%s' % (self.url, quote(data['url']), quote(self.name), quote(self.type)) + r = requests.put(put_url, data) + assert r.status_code == 200 + return True + return False + + def run(self): + delay = 10 + new = True + while True: + if not self.next(): + if new: + new = False + print "currently no more urls to fetch, reducing pull time to %d seconds" % delay + time.sleep(delay) + else: + new = True + +if __name__ == '__main__': + import sys + url = sys.argv[1] + name = sys.argv[2] + type = sys.argv[3] + print 'processing "%s" urls as "%s"' % (type, name) + c = Client(url, name, type) + c.run() diff --git a/server.py b/server.py new file mode 100755 index 0000000..e9cb1d5 --- /dev/null +++ b/server.py @@ -0,0 +1,221 @@ +#!/usr/bin/python +# encoding: utf-8 +# vi:si:et:sw=4:sts=4:ts=4 +import os +import json +import shutil +import time +import thread +from Queue import Queue +from threading import Thread +import sqlite3 +import hashlib + +import ox +from twisted.web.resource import Resource +from twisted.web.static import File +from twisted.web.server import Site +from twisted.internet import reactor + +LIMIT = { + 'maps.google.com': 100, + 'google.com': 100, +} + +class ProcessThread(Thread): + + def __init__(self, server): + Thread.__init__(self) + self.server = server + + def process(self, type, url, path): + print 'FIXME', 'do something here' + print type, url + print path + return True + + def run(self): + while True: + type, url, path = self.server.queue.get() + if self.process(type, url, path): + os.unlink(path) + self.server.db.done(url, type) + +PENDING = 0 +ACTIVE = 1 +DONE = 2 +FAILED = -1 + +class DB(object): + + def __init__(self, db): + self._db = db + conn, c = self._conn() + + c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''') + + if int(self.get('version', 0)) < 1: + self.set('version', 1) + db = [ + '''CREATE TABLE IF NOT EXISTS url ( + urlhash varchar(42), + url text, + type varchar(255), + status INT, + started INT, + client varchar(255))''', + '''CREATE INDEX IF NOT EXISTS urlhash_idx ON url (urlhash)''', + '''CREATE INDEX IF NOT EXISTS status_idx ON url (status, started)''', + '''CREATE INDEX IF NOT EXISTS client_idx ON url (client)''', + ] + for i in db: + c.execute(i) + conn.commit() + + def _conn(self): + db_conn = os.path.expanduser(self._db) + if not os.path.exists(os.path.dirname(db_conn)): + os.makedirs(os.path.dirname(db_conn)) + conn = sqlite3.connect(db_conn, timeout=10) + conn.text_factory = sqlite3.OptimizedUnicode + return conn, conn.cursor() + + def get(self, key, default=None): + conn, c = self._conn() + c.execute('SELECT value FROM setting WHERE key = ?', (key, )) + for row in c: + return row[0] + return default + + def set(self, key, value): + conn, c = self._conn() + c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value))) + conn.commit() + + def add_url(self, url, type): + conn, c = self._conn() + status = PENDING + started = 0 + urlhash = hashlib.sha1(url).hexdigest() + c.execute(u'INSERT OR REPLACE INTO url values (?, ?, ?, ?, ?, ?)', (urlhash, url, type, status, started, '')) + conn.commit() + + def get_url(self, client, type): + conn, c = self._conn() + now = time.mktime(time.localtime()) + timeout = 60*60 #one hour + c.execute('UPDATE url SET status = ? WHERE status = ? AND started < ?', (PENDING, ACTIVE, now-timeout)) + conn.commit() + c.execute('SELECT urlhash, url FROM url WHERE type = ? AND status = ?', (type, PENDING)) + for row in c: + print 'got url' + urlhash = row[0] + url = row[1] + c.execute(u'UPDATE url SET status = ?, started = ?, client = ? WHERE urlhash = ? AND type = ?', (ACTIVE, now, client, urlhash, type)) + conn.commit() + return url + + def done(self, url, type): + conn, c = self._conn() + urlhash = hashlib.sha1(url).hexdigest() + c.execute(u'UPDATE url SET status = ? WHERE urlhash = ? AND type = ?', (DONE, urlhash, type)) + conn.commit() + + def client_limit(self, client, type): + conn, c = self._conn() + limit = time.mktime(time.localtime()) - 60*60 + c.execute('SELECT COUNT(*) FROM url WHERE type = ? AND status = ? AND client = ? AND started > ?', (type, DONE, client, limit)) + for row in c: + return int(row[0]) + return 0 + +class Server(Resource): + urls = {} + clients = {} + + def __init__(self, db): + self.db = DB(db) + self.queue = Queue() + Resource.__init__(self) + t = ProcessThread(self) + t.setDaemon(True) + t.start() + + def render_json(self, request, response): + request.headers['Content-Type'] = 'application/json' + return json.dumps(response, indent=2) + + def getChild(self, name, request): + ''' + #make source media available via oshash + if request.path.startswith('/get/'): + oshash = request.path.split('/')[-1] + for path in self.client.path(oshash): + if os.path.exists(path): + f = File(path, 'application/octet-stream') + f.isLeaf = True + return f + ''' + return self + + def render_PUT(self, request): + if request.path.startswith('/save'): + url = request.args['url'][0] + type = request.args['type'][0] + path = '/tmp/%s.data' % hashlib.sha1(url).hexdigest() + with open(path, 'wb') as f: + shutil.copyfileobj(request.content, f) + #put in queue so it can be picked up by process thread + self.queue.put((type, url, path)) + return self.render_json(request, { + 'status': 'ok' + }) + request.setResponseCode(404) + return '404 unkown location' + + def render_POST(self, request): + if request.path.startswith('/add'): + ''' + call with url, type, limit + ''' + type = request.args['type'][0] + url = request.args['url'][0] + self.db.add_url(url, type) + return self.render_json(request, {}) + elif request.path.startswith('/get'): + ''' + call with type, client + ''' + client = request.args['client'][0] + type = request.args['type'][0] + if type not in LIMIT or self.db.client_limit(client, type) < LIMIT[type]: + url = self.db.get_url(client, type) + if url: + return self.render_json(request, { + 'url': url + }) + return self.render_json(request, {}) + request.setResponseCode(404) + return '404 unkown location' + + def render_GET(self, request): + if request.path.startswith('/status'): + #fixme list queued urls and stuff + return self.render_json(request, {}) + request.headers['Content-Type'] = 'text/html' + data = 'ox distributed read_url' + return data + +def main(db): + root = Server(db) + site = Site(root) + port = 2623 + interface = '0.0.0.0' + reactor.listenTCP(port, site, interface=interface) + print 'listening on http://%s:%s' % (interface, port) + reactor.run() + +if __name__ == '__main__': + import sys + db = os.path.abspath(sys.argv[1]) + main(db)