mini distributed scraper

This commit is contained in:
j 2013-06-05 12:26:29 +02:00
commit 79b265dd61
4 changed files with 310 additions and 0 deletions

19
README Normal file
View file

@ -0,0 +1,19 @@
distributed scraper consists of 3 parts
server.py, client.py, add_url.py
run server, pass path to database as first arguemnt:
python server.py urls.sqlite
run client, you have to pass server url, client name and job type:
python server.py http://127.0.01:2623 client01 maps.google.com
now you can add urls by running
python add_url.py http://127.0.01:2623 maps.google.com "http://mt1.google.com/vt/lyrs=h@218000000&hl=en&src=app&x=3&y=6&z=4&s=Galileo"
REQUIREMENTS:
you need:
apt-get install python-ox, python-requests and python-twisted
or:
sudo easy_install ox requests twisted

17
add_url.py Executable file
View file

@ -0,0 +1,17 @@
#!/usr/bin/python
# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import sys
import requests
if __name__ == '__main__':
server = sys.argv[1]
type = sys.argv[2]
url = sys.argv[3]
add_url = '%s/add' % server
r = requests.post(add_url, {
'url': url,
'type': type
})
print r

53
client.py Executable file
View file

@ -0,0 +1,53 @@
#!/usr/bin/python
# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import ox
import requests
import time
import json
from urllib import quote
class Client:
def __init__(self, url, name, type):
self.url = url
self.name = name
self.type = type
def next(self):
url = '%s/get' % self.url
r = requests.post(url, {
'type': self.type,
'client': self.name
})
data = json.loads(r.content)
if 'url' in data:
print data['url']
#result = ox.net.read_url(data['url'])
result = ox.cache.read_url(data['url'])
put_url = '%s/save?url=%s&client=%s&type=%s' % (self.url, quote(data['url']), quote(self.name), quote(self.type))
r = requests.put(put_url, data)
assert r.status_code == 200
return True
return False
def run(self):
delay = 10
new = True
while True:
if not self.next():
if new:
new = False
print "currently no more urls to fetch, reducing pull time to %d seconds" % delay
time.sleep(delay)
else:
new = True
if __name__ == '__main__':
import sys
url = sys.argv[1]
name = sys.argv[2]
type = sys.argv[3]
print 'processing "%s" urls as "%s"' % (type, name)
c = Client(url, name, type)
c.run()

221
server.py Executable file
View file

@ -0,0 +1,221 @@
#!/usr/bin/python
# encoding: utf-8
# vi:si:et:sw=4:sts=4:ts=4
import os
import json
import shutil
import time
import thread
from Queue import Queue
from threading import Thread
import sqlite3
import hashlib
import ox
from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
from twisted.internet import reactor
LIMIT = {
'maps.google.com': 100,
'google.com': 100,
}
class ProcessThread(Thread):
def __init__(self, server):
Thread.__init__(self)
self.server = server
def process(self, type, url, path):
print 'FIXME', 'do something here'
print type, url
print path
return True
def run(self):
while True:
type, url, path = self.server.queue.get()
if self.process(type, url, path):
os.unlink(path)
self.server.db.done(url, type)
PENDING = 0
ACTIVE = 1
DONE = 2
FAILED = -1
class DB(object):
def __init__(self, db):
self._db = db
conn, c = self._conn()
c.execute('''CREATE TABLE IF NOT EXISTS setting (key varchar(1024) unique, value text)''')
if int(self.get('version', 0)) < 1:
self.set('version', 1)
db = [
'''CREATE TABLE IF NOT EXISTS url (
urlhash varchar(42),
url text,
type varchar(255),
status INT,
started INT,
client varchar(255))''',
'''CREATE INDEX IF NOT EXISTS urlhash_idx ON url (urlhash)''',
'''CREATE INDEX IF NOT EXISTS status_idx ON url (status, started)''',
'''CREATE INDEX IF NOT EXISTS client_idx ON url (client)''',
]
for i in db:
c.execute(i)
conn.commit()
def _conn(self):
db_conn = os.path.expanduser(self._db)
if not os.path.exists(os.path.dirname(db_conn)):
os.makedirs(os.path.dirname(db_conn))
conn = sqlite3.connect(db_conn, timeout=10)
conn.text_factory = sqlite3.OptimizedUnicode
return conn, conn.cursor()
def get(self, key, default=None):
conn, c = self._conn()
c.execute('SELECT value FROM setting WHERE key = ?', (key, ))
for row in c:
return row[0]
return default
def set(self, key, value):
conn, c = self._conn()
c.execute(u'INSERT OR REPLACE INTO setting values (?, ?)', (key, str(value)))
conn.commit()
def add_url(self, url, type):
conn, c = self._conn()
status = PENDING
started = 0
urlhash = hashlib.sha1(url).hexdigest()
c.execute(u'INSERT OR REPLACE INTO url values (?, ?, ?, ?, ?, ?)', (urlhash, url, type, status, started, ''))
conn.commit()
def get_url(self, client, type):
conn, c = self._conn()
now = time.mktime(time.localtime())
timeout = 60*60 #one hour
c.execute('UPDATE url SET status = ? WHERE status = ? AND started < ?', (PENDING, ACTIVE, now-timeout))
conn.commit()
c.execute('SELECT urlhash, url FROM url WHERE type = ? AND status = ?', (type, PENDING))
for row in c:
print 'got url'
urlhash = row[0]
url = row[1]
c.execute(u'UPDATE url SET status = ?, started = ?, client = ? WHERE urlhash = ? AND type = ?', (ACTIVE, now, client, urlhash, type))
conn.commit()
return url
def done(self, url, type):
conn, c = self._conn()
urlhash = hashlib.sha1(url).hexdigest()
c.execute(u'UPDATE url SET status = ? WHERE urlhash = ? AND type = ?', (DONE, urlhash, type))
conn.commit()
def client_limit(self, client, type):
conn, c = self._conn()
limit = time.mktime(time.localtime()) - 60*60
c.execute('SELECT COUNT(*) FROM url WHERE type = ? AND status = ? AND client = ? AND started > ?', (type, DONE, client, limit))
for row in c:
return int(row[0])
return 0
class Server(Resource):
urls = {}
clients = {}
def __init__(self, db):
self.db = DB(db)
self.queue = Queue()
Resource.__init__(self)
t = ProcessThread(self)
t.setDaemon(True)
t.start()
def render_json(self, request, response):
request.headers['Content-Type'] = 'application/json'
return json.dumps(response, indent=2)
def getChild(self, name, request):
'''
#make source media available via oshash
if request.path.startswith('/get/'):
oshash = request.path.split('/')[-1]
for path in self.client.path(oshash):
if os.path.exists(path):
f = File(path, 'application/octet-stream')
f.isLeaf = True
return f
'''
return self
def render_PUT(self, request):
if request.path.startswith('/save'):
url = request.args['url'][0]
type = request.args['type'][0]
path = '/tmp/%s.data' % hashlib.sha1(url).hexdigest()
with open(path, 'wb') as f:
shutil.copyfileobj(request.content, f)
#put in queue so it can be picked up by process thread
self.queue.put((type, url, path))
return self.render_json(request, {
'status': 'ok'
})
request.setResponseCode(404)
return '404 unkown location'
def render_POST(self, request):
if request.path.startswith('/add'):
'''
call with url, type, limit
'''
type = request.args['type'][0]
url = request.args['url'][0]
self.db.add_url(url, type)
return self.render_json(request, {})
elif request.path.startswith('/get'):
'''
call with type, client
'''
client = request.args['client'][0]
type = request.args['type'][0]
if type not in LIMIT or self.db.client_limit(client, type) < LIMIT[type]:
url = self.db.get_url(client, type)
if url:
return self.render_json(request, {
'url': url
})
return self.render_json(request, {})
request.setResponseCode(404)
return '404 unkown location'
def render_GET(self, request):
if request.path.startswith('/status'):
#fixme list queued urls and stuff
return self.render_json(request, {})
request.headers['Content-Type'] = 'text/html'
data = 'ox distributed read_url'
return data
def main(db):
root = Server(db)
site = Site(root)
port = 2623
interface = '0.0.0.0'
reactor.listenTCP(port, site, interface=interface)
print 'listening on http://%s:%s' % (interface, port)
reactor.run()
if __name__ == '__main__':
import sys
db = os.path.abspath(sys.argv[1])
main(db)