From 4b790722ae9661ccc1a4712920e20bdc4dee70d6 Mon Sep 17 00:00:00 2001 From: j Date: Sat, 9 Aug 2014 20:32:41 +0200 Subject: [PATCH] cleanly shut down --- oml/db.py | 12 ++++++++---- oml/downloads.py | 2 -- oml/localnodes.py | 20 +++++++++++++++----- oml/nodes.py | 3 ++- oml/server.py | 12 +++++++++++- oml/tasks.py | 2 +- 6 files changed, 37 insertions(+), 14 deletions(-) diff --git a/oml/db.py b/oml/db.py index f461409..43c671d 100644 --- a/oml/db.py +++ b/oml/db.py @@ -9,6 +9,7 @@ from sqlalchemy.ext.declarative import declarative_base import settings import state + engine = create_engine('sqlite:////%s' % settings.db_path) Session = scoped_session(sessionmaker(bind=engine)) @@ -43,10 +44,13 @@ def session(): else: state.db.session = Session() state.db.count = 1 - yield - state.db.count -= 1 - if not state.db.count: - Session.remove() + try: + yield state.db.session + finally: + state.db.count -= 1 + if not state.db.count: + state.db.session.close() + Session.remove() class MutableDict(Mutable, dict): @classmethod diff --git a/oml/downloads.py b/oml/downloads.py index ed22dbd..e7457ee 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -50,6 +50,4 @@ class Downloads(Thread): def join(self): self._running = False - self._q.put(None) return Thread.join(self) - diff --git a/oml/localnodes.py b/oml/localnodes.py index 9e3be08..c601a23 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -39,6 +39,7 @@ class LocalNodesBase(Thread): _TTL = 1 def __init__(self, nodes): + self._socket = None self._active = True self._nodes = nodes Thread.__init__(self) @@ -71,11 +72,12 @@ class LocalNodesBase(Thread): s.bind(('', self._PORT)) while self._active: data, addr = s.recvfrom(1024) - while data[-1] == '\0': - data = data[:-1] # Strip trailing \0's - data = self.verify(data) - if data: - self.update_node(data) + if self._active: + while data[-1] == '\0': + data = data[:-1] # Strip trailing \0's + data = self.verify(data) + if data: + self.update_node(data) except: logger.debug('receive failed. restart later', exc_info=1) time.sleep(10) @@ -133,6 +135,12 @@ class LocalNodesBase(Thread): def join(self): self._active = False + if self._socket: + try: + self._socket.shutdown(socket.SHUT_RDWR) + except: + pass + self._socket.close() return Thread.join(self) class LocalNodes4(LocalNodesBase): @@ -156,6 +164,7 @@ class LocalNodes4(LocalNodesBase): s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) mreq = struct.pack("=4sl", socket.inet_aton(self._BROADCAST), socket.INADDR_ANY) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + self._socket = s return s def get_ip(self): @@ -186,6 +195,7 @@ class LocalNodes6(LocalNodesBase): s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) group_bin = socket.inet_pton(socket.AF_INET6, self._BROADCAST) + '\0'*4 s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_bin) + self._socket = s return s def get_ip(self): diff --git a/oml/nodes.py b/oml/nodes.py index 1260929..44c56e0 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -67,7 +67,7 @@ class Node(Thread): def join(self): self._running = False self.ping() - return Thread.join(self) + #return Thread.join(self) def ping(self): self._q.put('') @@ -409,4 +409,5 @@ class Nodes(Thread): self._q.put(None) for node in self._nodes.values(): node.join() + self._local.join() return Thread.join(self) diff --git a/oml/server.py b/oml/server.py index 1b94879..09bd968 100644 --- a/oml/server.py +++ b/oml/server.py @@ -92,4 +92,14 @@ def run(): host = settings.server['address'] url = 'http://%s:%s/' % (host, settings.server['port']) print 'open browser at %s' % url - state.main.start() + try: + state.main.start() + except: + print 'shutting down...' + + if state.downloads: + state.downloads.join() + if state.tasks: + state.tasks.join() + if state.nodes: + state.nodes.join() diff --git a/oml/tasks.py b/oml/tasks.py index 1aca1f2..3e9867b 100644 --- a/oml/tasks.py +++ b/oml/tasks.py @@ -41,7 +41,7 @@ class Tasks(Thread): def join(self): self.connected = False - self.put(None) + self.q.put(None) self.q.join() return Thread.join(self)