better online/offline status handling

This commit is contained in:
j 2015-12-02 22:05:23 +01:00
parent 3b166eccbb
commit 8a26285c88
11 changed files with 92 additions and 40 deletions

View file

@ -74,12 +74,12 @@ class ScrapeThread(Thread):
return True return True
logger.debug('scrape %s', s.item) logger.debug('scrape %s', s.item)
try: try:
s.item.scrape() if s.item.scrape():
for f in s.item.files: for f in s.item.files:
f.move() f.move()
s.item.update_icons() s.item.update_icons()
s.remove() s.remove()
trigger_event('change', {}) trigger_event('change', {})
scraped = True scraped = True
except: except:
logger.debug('scrape failed %s', s.item, exc_info=1) logger.debug('scrape failed %s', s.item, exc_info=1)

View file

@ -353,7 +353,10 @@ class Item(db.Model):
m['primaryid'] = primaryid m['primaryid'] = primaryid
self.meta = m self.meta = m
self.modified = datetime.utcnow() self.modified = datetime.utcnow()
self.update() self.update()
return True
return False
return True
def queue_download(self): def queue_download(self):
u = state.user() u = state.user()

View file

@ -148,6 +148,7 @@ class LocalNodesBase(Thread):
state.nodes.queue('add', u.id) state.nodes.queue('add', u.id)
self.send() self.send()
def get_ip(self): def get_ip(self):
pass pass

View file

@ -38,7 +38,7 @@ class Node(Thread):
_pulling = False _pulling = False
host = None host = None
local = None local = None
online = False _online = None
download_speed = 0 download_speed = 0
TIMEOUT = 5 TIMEOUT = 5
@ -78,11 +78,12 @@ class Node(Thread):
#return Thread.join(self) #return Thread.join(self)
def pull(self): def pull(self):
if not self._pulling: if state.online and not self._pulling:
self._q.put('pull') self._q.put('pull')
def ping(self): def ping(self):
self._q.put('ping') if state.online:
self._q.put('ping')
def go_online(self): def go_online(self):
self._q.put('go_online') self._q.put('go_online')
@ -99,6 +100,18 @@ class Node(Thread):
url = 'https://%s.onion:9851' % self.user_id url = 'https://%s.onion:9851' % self.user_id
return url return url
@property
def online(self):
return self._online
@online.setter
def online(self, online):
if self._online != online:
self._online = online
self.trigger_status()
else:
self._online = online
def resolve(self): def resolve(self):
#logger.debug('resolve node %s', self.user_id) #logger.debug('resolve node %s', self.user_id)
r = self.get_local() r = self.get_local()
@ -263,13 +276,13 @@ class Node(Thread):
self.online = False self.online = False
else: else:
self.online = False self.online = False
self.trigger_status()
def trigger_status(self): def trigger_status(self):
trigger_event('status', { if self.online is not None:
'id': self.user_id, trigger_event('status', {
'online': self.online 'id': self.user_id,
}) 'online': self.online
})
def pullChanges(self): def pullChanges(self):
if not self.online or not self.user.peered: if not self.online or not self.user.peered:
@ -280,7 +293,6 @@ class Node(Thread):
changes = self.request('pullChanges', from_revision) changes = self.request('pullChanges', from_revision)
except: except:
self.online = False self.online = False
self.trigger_status()
logger.debug('%s went offline', self.user.name) logger.debug('%s went offline', self.user.name)
return False return False
if not changes: if not changes:
@ -296,7 +308,6 @@ class Node(Thread):
r = self.request('pushChanges', changes) r = self.request('pushChanges', changes)
except: except:
self.online = False self.online = False
self.trigger_status()
r = False r = False
logger.debug('pushedChanges %s %s', r, self.user_id) logger.debug('pushedChanges %s %s', r, self.user_id)
@ -407,6 +418,15 @@ class Nodes(Thread):
def __init__(self): def __init__(self):
self._q = Queue() self._q = Queue()
self._running = True self._running = True
with db.session():
for u in user.models.User.query.filter_by(peered=True):
if 'local' in u.info:
del u.info['local']
u.save()
self.queue('add', u.id)
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
self.queue('add', u.id)
self._local = LocalNodes() self._local = LocalNodes()
self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000) self._cleanup = PeriodicCallback(lambda: self.queue('cleanup'), 120000)
self._cleanup.start() self._cleanup.start()

View file

@ -53,6 +53,7 @@ def log_request(handler):
request_time = 1000.0 * handler.request.request_time() request_time = 1000.0 * handler.request.request_time()
log_method("%d %s %.2fms", handler.get_status(), log_method("%d %s %.2fms", handler.get_status(),
handler._request_summary(), request_time) handler._request_summary(), request_time)
def run(): def run():
setup.create_db() setup.create_db()
PID = sys.argv[2] if len(sys.argv) > 2 else None PID = sys.argv[2] if len(sys.argv) > 2 else None
@ -106,7 +107,6 @@ def run():
state.tasks = tasks.Tasks() state.tasks = tasks.Tasks()
def start_node(): def start_node():
import user
import downloads import downloads
import nodes import nodes
import tor import tor
@ -117,21 +117,12 @@ def run():
state.downloads = downloads.Downloads() state.downloads = downloads.Downloads()
state.scraping = downloads.ScrapeThread() state.scraping = downloads.ScrapeThread()
state.nodes = nodes.Nodes() state.nodes = nodes.Nodes()
def add_users(): def publish():
if not state.tor.is_online(): if not state.tor.is_online():
state.main.add_callback(add_users) state.main.call_later(1, publish)
else: else:
with db.session():
for u in user.models.User.query.filter_by(peered=True):
if 'local' in u.info:
del u.info['local']
u.save()
state.nodes.queue('add', u.id)
for u in user.models.User.query.filter_by(queued=True):
logger.debug('adding queued node... %s', u.id)
state.nodes.queue('add', u.id)
nodes.publish_node() nodes.publish_node()
state.main.add_callback(add_users) state.main.add_callback(publish)
state.main.add_callback(start_node) state.main.add_callback(start_node)
if ':' in settings.server['address']: if ':' in settings.server['address']:
host = '[%s]' % settings.server['address'] host = '[%s]' % settings.server['address']

View file

@ -8,6 +8,7 @@ from stem.control import Controller
import settings import settings
import state import state
import utils
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -127,7 +128,7 @@ class Tor(object):
self.connected = True self.connected = True
self.socks_port = int(self.controller.get_conf('SocksPort').split(' ')[0]) self.socks_port = int(self.controller.get_conf('SocksPort').split(' ')[0])
self.publish() self.publish()
state.online = True state.online = self.is_online()
return True return True
def reconnect(self): def reconnect(self):
@ -193,4 +194,4 @@ class Tor(object):
state.online = False state.online = False
def is_online(self): def is_online(self):
return self.connected and self.controller.is_alive() return self.connected and self.controller.is_alive() and utils.can_connect_dns()

View file

@ -346,3 +346,16 @@ def open_folder(folder=None, path=None):
else: else:
logger.debug('unsupported platform %s', sys.platform) logger.debug('unsupported platform %s', sys.platform)
subprocess.Popen(cmd, close_fds=True) subprocess.Popen(cmd, close_fds=True)
def can_connect_dns(host="8.8.8.8", port=53):
"""
host: 8.8.8.8 (google-public-dns-a.google.com)
port: 53/tcp
"""
try:
socket.setdefaulttimeout(1)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
return True
except:
pass
return False

View file

@ -9,6 +9,7 @@ import json
from oxtornado import json_dumps from oxtornado import json_dumps
import state import state
import settings
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -33,6 +34,13 @@ class Handler(WebSocketHandler):
self.close() self.close()
if self not in state.websockets: if self not in state.websockets:
state.websockets.append(self) state.websockets.append(self)
trigger_event('status', {
'id': settings.USER_ID,
'online': state.online
})
if state.nodes:
for node in state.nodes._nodes.values():
node.trigger_status()
#websocket calls #websocket calls

View file

@ -344,7 +344,12 @@ oml.ui.folders = function() {
Ox.print('peering.remove reload list') Ox.print('peering.remove reload list')
Ox.Request.clearCache('getUsers'); Ox.Request.clearCache('getUsers');
that.updateElement(); that.updateElement();
} },
status: function(data) {
if (data.id == oml.user.id) {
oml.user.online = data.online;
}
},
}); });
return that.updateElement(); return that.updateElement();

View file

@ -42,9 +42,13 @@ oml.ui.statusIcon = function(user, index) {
} }
function getStatus(data) { function getStatus(data) {
return !oml.user.online ? 'unknown' if (!oml.user.online) {
: data.online ? 'connected' return 'unknown';
: 'disconnected'; }
if (user.id == data.id) {
return data.online ? 'connected' : 'disconnected';
}
return status || 'unknown';
} }
function render() { function render() {
@ -75,7 +79,7 @@ oml.ui.statusIcon = function(user, index) {
} }
function update(data) { function update(data) {
if (data.id == user.id) { if (data.id == user.id || data.id == oml.user.id) {
var newStatus = getStatus(data); var newStatus = getStatus(data);
if (status != newStatus) { if (status != newStatus) {
status = newStatus; status = newStatus;

View file

@ -890,11 +890,17 @@ oml.renameUser = function(data) {
var ui = oml.user.ui, var ui = oml.user.ui,
index = Ox.getIndexById(ui._users, data.id), index = Ox.getIndexById(ui._users, data.id),
name = ui._users[index].name, name,
set = {}, set = {},
oldFind = Ox.clone(ui.find, true), oldFind,
newFind = Ox.clone(ui.find, true); newFind;
if (index == -1) {
return;
}
name = ui._users[index].name;
oldFind = Ox.clone(ui.find, true);
newFind = Ox.clone(ui.find, true);
ui._users[index].name = data.name; ui._users[index].name = data.name;
ui._users[index].nickname = data.nickname; ui._users[index].nickname = data.nickname;
set['showFolder.' + name] = null; set['showFolder.' + name] = null;