Ox.queue: add 'cancel' and 'reset' (=cancel+clear) methods

This commit is contained in:
rolux 2013-11-16 16:13:41 +01:00
parent 7bd38b3bb9
commit eb4808d4c1

View file

@ -88,37 +88,65 @@ Ox.queue <f> Queue of asynchronous function calls with cached results
The results are cached based on all arguments to `fn`, except the last one, The results are cached based on all arguments to `fn`, except the last one,
which is the callback. which is the callback.
(fn, maxThreads) -> <f> Queue function (fn, maxThreads) -> <f> Queue function
.clear <f> Clear method .cancel <f> Cancels all running function calls
.clear <f> Clears the queue
.reset <f> Cancels all running function calls and clears the queue
fn <f> Queued function fn <f> Queued function
maxThreads <n|10> Number of parallel function calls maxThreads <n|10> Number of parallel function calls
@*/ @*/
Ox.queue = function(fn, maxThreads) { Ox.queue = function(fn, maxThreads) {
var maxThreads = maxThreads || 10, var maxThreads = maxThreads || 10,
queue = [], processing = [],
queued = [],
ret = Ox.cache(function() { ret = Ox.cache(function() {
queue.push(Ox.toArray(arguments)); var args = Ox.toArray(arguments);
queued.push({
args: args,
key: getKey(args)
});
process(); process();
}, { }, {
async: true, async: true,
key: function(args) { key: getKey
return JSON.stringify(args.slice(0, -1));
}
}), }),
threads = 0; threads = 0;
ret.clear = function() { ret.cancel = function() {
queue = []; processing = [];
return ret;
}; };
ret.clear = function() {
queued = [];
return ret;
};
ret.reset = function() {
return ret.cancel().clear();
};
function getKey(args) {
return JSON.stringify(args.slice(0, -1));
}
function process() { function process() {
var n = Math.min(queue.length, maxThreads - threads); var n = Math.min(queued.length, maxThreads - threads);
if (n) { if (n) {
threads += n; threads += n;
Ox.parallelForEach(queue.splice(0, n), function(args, index, array, callback) { processing = processing.concat(queued.splice(0, n));
Ox.parallelForEach(
processing,
function(value, index, array, callback) {
var args = value.args, key = value.key;
fn.apply(this, args.slice(0, -1).concat(function(result) { fn.apply(this, args.slice(0, -1).concat(function(result) {
threads--; var index = Ox.indexOf(processing, function(value) {
return value.key == key;
});
if (index > -1) {
processing.splice(index, 1);
args.slice(-1)[0](result); args.slice(-1)[0](result);
}
threads--;
callback(); callback();
})); }));
}, process); },
process
);
} }
} }
return ret; return ret;