From eb4808d4c1d40cb4fc55b23d7f703178a33ac29f Mon Sep 17 00:00:00 2001 From: rolux Date: Sat, 16 Nov 2013 16:13:41 +0100 Subject: [PATCH] Ox.queue: add 'cancel' and 'reset' (=cancel+clear) methods --- source/Ox/js/Function.js | 60 +++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/source/Ox/js/Function.js b/source/Ox/js/Function.js index d69be7f9..ba9d3d04 100644 --- a/source/Ox/js/Function.js +++ b/source/Ox/js/Function.js @@ -88,37 +88,65 @@ Ox.queue Queue of asynchronous function calls with cached results The results are cached based on all arguments to `fn`, except the last one, which is the callback. (fn, maxThreads) -> Queue function - .clear Clear method + .cancel Cancels all running function calls + .clear Clears the queue + .reset Cancels all running function calls and clears the queue fn Queued function maxThreads Number of parallel function calls @*/ Ox.queue = function(fn, maxThreads) { var maxThreads = maxThreads || 10, - queue = [], + processing = [], + queued = [], ret = Ox.cache(function() { - queue.push(Ox.toArray(arguments)); + var args = Ox.toArray(arguments); + queued.push({ + args: args, + key: getKey(args) + }); process(); }, { async: true, - key: function(args) { - return JSON.stringify(args.slice(0, -1)); - } + key: getKey }), threads = 0; - ret.clear = function() { - queue = []; + ret.cancel = function() { + processing = []; + return ret; }; + ret.clear = function() { + queued = []; + return ret; + }; + ret.reset = function() { + return ret.cancel().clear(); + }; + function getKey(args) { + return JSON.stringify(args.slice(0, -1)); + } function process() { - var n = Math.min(queue.length, maxThreads - threads); + var n = Math.min(queued.length, maxThreads - threads); if (n) { threads += n; - Ox.parallelForEach(queue.splice(0, n), function(args, index, array, callback) { - fn.apply(this, args.slice(0, -1).concat(function(result) { - threads--; - args.slice(-1)[0](result); - callback(); - })); - }, process); + processing = processing.concat(queued.splice(0, n)); + Ox.parallelForEach( + processing, + function(value, index, array, callback) { + var args = value.args, key = value.key; + fn.apply(this, args.slice(0, -1).concat(function(result) { + var index = Ox.indexOf(processing, function(value) { + return value.key == key; + }); + if (index > -1) { + processing.splice(index, 1); + args.slice(-1)[0](result); + } + threads--; + callback(); + })); + }, + process + ); } } return ret;