cleanup Ox.queue

This commit is contained in:
rolux 2013-07-10 01:27:25 +02:00
parent f938f281f1
commit b0f9558845
2 changed files with 37 additions and 40 deletions

View file

@ -284,44 +284,4 @@
}; };
// FIXME: The above test with 10000 iterations blows the stack // FIXME: The above test with 10000 iterations blows the stack
/*@
Ox.queue <f>
cached process queue, passed function is exectued in parallel and
results are cached based on all but last argument,
last argument is a callback that gets called with results
fn <f> function
options <o>
maxThreads <n> number of concurrent threads
callback <f> gets called with object containing duration, width, height
@*/
Ox.queue = function(fn, options) {
var queue = [],
threads = 0;
function process() {
var next = Math.min(queue.length, options.maxThreads-threads);
if (next) {
threads += next;
Ox.parallelForEach(queue.splice(0, next), function(args, index, array, done) {
fn.apply(this, args.slice(0, -1).concat(function(result) {
threads--;
args.slice(-1)[0](result);
done();
}));
}, process);
}
}
return Ox.cache(function() {
queue.push(Ox.toArray(arguments));
process();
}, {
async: true,
key: function(args) {
return JSON.stringify(args.slice(0, -1));
}
});
};
}()); }());

View file

@ -82,3 +82,40 @@ Ox.noop = function() {
var callback = Ox.last(arguments); var callback = Ox.last(arguments);
Ox.isFunction(callback) && callback(); Ox.isFunction(callback) && callback();
}; };
/*@
Ox.queue <f> Queue of asynchronous function calls with cached results
The results are cached based on all but the last argument, which is the
callback.
fn <f> function
maxThreads <n> Number of parallel function calls
callback <f> Callback function
result <*> Result
@*/
Ox.queue = function(fn, maxThreads) {
var maxThreads = maxThreads || 10,
queue = [],
threads = 0;
function process() {
var n = Math.min(queue.length, maxThreads - threads);
if (n) {
threads += n;
Ox.parallelForEach(queue.splice(0, n), function(args, index, array, callback) {
fn.apply(this, args.slice(0, -1).concat(function(result) {
threads--;
args.slice(-1)[0](result);
callback();
}));
}, process);
}
}
return Ox.cache(function() {
queue.push(Ox.toArray(arguments));
process();
}, {
async: true,
key: function(args) {
return JSON.stringify(args.slice(0, -1));
}
});
};