From 567952d91d2bf8ce12ddc0246e64fed6f47647c6 Mon Sep 17 00:00:00 2001 From: j Date: Sun, 31 Jan 2016 22:15:14 +0530 Subject: [PATCH] use one variable to track app shutdown state --- oml/downloads.py | 10 +++++----- oml/item/scan.py | 12 ++++++------ oml/localnodes.py | 18 +++++++----------- oml/nodes.py | 22 ++++++++-------------- oml/server.py | 7 ++++--- oml/state.py | 1 + oml/tasks.py | 9 ++++----- 7 files changed, 35 insertions(+), 44 deletions(-) diff --git a/oml/downloads.py b/oml/downloads.py index ca70476..cb02811 100644 --- a/oml/downloads.py +++ b/oml/downloads.py @@ -18,7 +18,6 @@ logger = logging.getLogger(__name__) class Downloads(Thread): def __init__(self): - self._running = True Thread.__init__(self) self.daemon = True self.start() @@ -36,9 +35,11 @@ class Downloads(Thread): for t in item.models.Transfer.query.filter( item.models.Transfer.added!=None, item.models.Transfer.progress<1).order_by(item.models.Transfer.added): - if not self._running: + if state.shutdown: return False for u in t.item.users: + if state.shutdown: + return False if state.nodes.is_online(u.id): logger.debug('DOWNLOAD %s %s', t.item, u) r = state.nodes.download(u.id, t.item) @@ -47,14 +48,13 @@ class Downloads(Thread): def run(self): self.wait(10) - while self._running: + while not state.shutdown: self.wait_online() with db.session(): self.download_next() self.wait(10) def join(self): - self._running = False return Thread.join(self) def wait_online(self): @@ -63,7 +63,7 @@ class Downloads(Thread): def wait(self, timeout): step = min(timeout, 1) - while self._running and timeout > 0: + while not state.shutdown and timeout > 0: time.sleep(step) timeout -= step diff --git a/oml/item/scan.py b/oml/item/scan.py index a5b7881..4b2a41f 100644 --- a/oml/item/scan.py +++ b/oml/item/scan.py @@ -31,7 +31,7 @@ def remove_missing(): prefix = os.path.join(os.path.expanduser(prefs['libraryPath']), 'Books' + os.sep) if os.path.exists(prefix): for f in File.query: - if not state.tasks.connected: + if state.shutdown: return if f.item: path = f.item.get_path() @@ -45,7 +45,7 @@ def remove_missing(): state.db.session.commit() state.cache.clear('group:') for f in File.query: - if not state.tasks.connected: + if state.shutdown: return f.move() remove_empty_folders(prefix, True) @@ -82,7 +82,7 @@ def run_scan(): books = [] for root, folders, files in os.walk(prefix): for f in files: - if not state.tasks.connected: + if state.shutdown: return #if f.startswith('._') or f == '.DS_Store': if f.startswith('.'): @@ -97,7 +97,7 @@ def run_scan(): position = 0 added = 0 for f in ox.sorted_strings(books): - if not state.tasks.connected: + if state.shutdown: return position += 1 with db.session(): @@ -159,7 +159,7 @@ def run_import(options=None): count = 0 for root, folders, files in os.walk(prefix): for f in files: - if not state.tasks.connected: + if state.shutdown: return #if f.startswith('._') or f == '.DS_Store': if f.startswith('.'): @@ -217,7 +217,7 @@ def run_import(options=None): if state.activity.get('cancel'): state.activity = {} return - if not state.tasks.connected: + if state.shutdown: return if time.time() - last > 5: last = time.time() diff --git a/oml/localnodes.py b/oml/localnodes.py index bfc334e..ce8bbfd 100644 --- a/oml/localnodes.py +++ b/oml/localnodes.py @@ -55,7 +55,6 @@ class LocalNodesBase(Thread): def __init__(self, nodes): self._socket = None - self._active = True self._nodes = nodes Thread.__init__(self) if not server['localnode_discovery']: @@ -87,12 +86,12 @@ class LocalNodesBase(Thread): last = time.mktime(time.localtime()) s = self.get_socket() s.bind(('', self._PORT)) - while self._active: + while not state.shutdown: try: r, _, _ = select.select([s], [], [], 3) if r: data, addr = s.recvfrom(1024) - if self._active: + if not state.shutdown: while data[-1] == 0: data = data[:-1] # Strip trailing \0's data = self.verify(data) @@ -101,11 +100,11 @@ class LocalNodesBase(Thread): except OSError: # no local interface exists self.wait(60) except: - if self._active: + if not state.shutdown: logger.debug('receive failed. restart later', exc_info=True) self.wait(60) finally: - if self._active: + if not state.shutdown: now = time.mktime(time.localtime()) if now - last > 60: last = now @@ -159,7 +158,6 @@ class LocalNodesBase(Thread): self.receive() def join(self): - self._active = False if self._socket: try: self._socket.shutdown(socket.SHUT_RDWR) @@ -170,7 +168,7 @@ class LocalNodesBase(Thread): def wait(self, timeout): step = min(timeout, 1) - while self._active and timeout > 0: + while not state.shutdown and timeout > 0: time.sleep(step) timeout -= step @@ -241,7 +239,6 @@ class LocalNodes6(LocalNodesBase): class LocalNodes(object): - _active = True _nodes4 = None _nodes6 = None @@ -253,7 +250,7 @@ class LocalNodes(object): #self._nodes6 = LocalNodes6(self._nodes) def cleanup(self): - if self._active: + if not state.shutdown: for id in list(self._nodes.keys()): if not can_connect(self._nodes[id]): with db.session(): @@ -262,7 +259,7 @@ class LocalNodes(object): del u.info['local'] u.save() del self._nodes[id] - if not self._active: + if state.shutdown: break def get(self, user_id): @@ -271,7 +268,6 @@ class LocalNodes(object): return self._nodes[user_id] def join(self): - self._active = False if self._nodes4: self._nodes4.join() if self._nodes6: diff --git a/oml/nodes.py b/oml/nodes.py index 266855b..bebfa4c 100644 --- a/oml/nodes.py +++ b/oml/nodes.py @@ -31,7 +31,6 @@ logger = logging.getLogger(__name__) ENCODING='base64' class Node(Thread): - _running = True host = None local = None _online = None @@ -46,13 +45,13 @@ class Node(Thread): self._q = Queue() Thread.__init__(self) self.daemon = True - self.start() self.ping() + self.start() def run(self): - while self._running: + while not state.shutdown: action = self._q.get() - if not self._running: + if state.shutdown: break if action == 'send_response': self._send_response() @@ -62,11 +61,9 @@ class Node(Thread): logger.debug('unknown action %s', action) def join(self): - self._running = False self._q.put('') #return Thread.join(self) - def ping(self): if state.online: self._q.put('ping') @@ -155,7 +152,6 @@ class Node(Thread): except urllib.error.HTTPError as e: if e.code == 403: logger.debug('403: %s (%s)', url, self.user_id) - self._running = False if state.tasks: state.tasks.queue('peering', (self.user_id, False)) del self._nodes[self.user_id] @@ -335,7 +331,7 @@ class Node(Thread): 'id': item.id, 'progress': t.progress }) if state.bandwidth: - while not state.bandwidth.download(chunk_size) and self._running: + while not state.bandwidth.download(chunk_size) and not state.shutdown: time.sleep(0.1) t2 = datetime.utcnow() duration = (t2-t1).total_seconds() @@ -407,7 +403,6 @@ class Nodes(Thread): def __init__(self): self._q = Queue() - self._running = True with db.session(): for u in user.models.User.query.filter_by(peered=True): if 'local' in u.info: @@ -427,7 +422,7 @@ class Nodes(Thread): self.start() def cleanup(self): - if self._running and self._local: + if not state.shutdown and self._local: self._local.cleanup() def pull(self): @@ -479,14 +474,14 @@ class Nodes(Thread): return self._pulling = True for node in list(self._nodes.values()): - if self._running: + if not state.shutdown: node.online = node.can_connect() - if self._running and node.online: + if not state.shutdown and node.online: node.pullChanges() self._pulling = False def run(self): - while self._running: + while not state.shutdown: args = self._q.get() if args: if args[0] == 'cleanup': @@ -499,7 +494,6 @@ class Nodes(Thread): self._call(*args) def join(self): - self._running = False self._q.put(None) for node in list(self._nodes.values()): node.join() diff --git a/oml/server.py b/oml/server.py index 5c22f0a..0bb7421 100644 --- a/oml/server.py +++ b/oml/server.py @@ -60,8 +60,12 @@ def log_request(handler): handler._request_summary(), request_time) def shutdown(): + state.shutdown = True if state.tor: state.tor._shutdown = True + if state.nodes: + logger.debug('shutdown nodes') + state.nodes.join() if state.downloads: logger.debug('shutdown downloads') state.downloads.join() @@ -70,9 +74,6 @@ def shutdown(): if state.tasks: logger.debug('shutdown tasks') state.tasks.join() - if state.nodes: - logger.debug('shutdown nodes') - state.nodes.join() if state.node: state.node.stop() if state.tor: diff --git a/oml/state.py b/oml/state.py index 062c45c..900fc99 100644 --- a/oml/state.py +++ b/oml/state.py @@ -8,6 +8,7 @@ tasks = False downloads = False tor = False update = False +shutdown = False websockets = [] activity = {} diff --git a/oml/tasks.py b/oml/tasks.py index 08fed06..94df188 100644 --- a/oml/tasks.py +++ b/oml/tasks.py @@ -5,6 +5,7 @@ from queue import Queue from threading import Thread from websocket import trigger_event +import state import logging logger = logging.getLogger(__name__) @@ -13,7 +14,6 @@ class Tasks(Thread): def __init__(self): self.q = Queue() - self.connected = True Thread.__init__(self) self.daemon = True self.start() @@ -23,9 +23,9 @@ class Tasks(Thread): import item.scan from item.models import sync_metadata, get_preview from user.models import export_list, update_user_peering - while self.connected: + while not state.shutdown: m = self.q.get() - if m and self.connected: + if m and not state.shutdown: try: action, data = m logger.debug('%s start', action) @@ -55,12 +55,11 @@ class Tasks(Thread): self.q.task_done() def join(self): - self.connected = False self.q.put(None) return Thread.join(self) def queue(self, action, data=None): - if self.connected: + if not state.shutdown: logger.debug('%s queued', action) self.q.put((action, data))