一:流程控制
为了适应异步编程,减少回调的嵌套,我尝试了很多库。最终觉得还是async最靠谱。
地址:https://github.com/caolan/async
Async的内容分为三部分:
- 流程控制:简化十种常见流程的处理
- 集合处理:如何使用异步操作处理集合中的数据
- 工具类:几个常用的工具类
本文介绍其中最简单最常用的流程控制部分。
由于nodejs是异步编程模型,有一些在同步编程中很容易做到的事情,现在却变得很麻烦。Async的流程控制就是为了简化这些操作。
1. series(tasks, [callback]) (多个函数依次执行,之间没有数据交换)
有多个异步函数需要依次调用,一个完成之后才能执行下一个。各函数之间没有数据的交换,仅仅需要保证其执行顺序。这时可使用series。
纯js代码:
step1(function(err, v1) {
step2(function(err, v2) {
step3(function(err, v3) {
// do somethig with the err or values v1/v2/v3
}
}
});
从中可以看到这嵌套还是比较多深的,如果再多几步,会更深。在代码中忽略对了每一层err的处理,否则还都等加上 if(err) return callback(err),那就更麻烦了。
对于这种情况,使用async来处理,就是这样的:
var async = require('async')
async.series([
step1, step2, step3
], function(err, values) {
// do somethig with the err or values v1/v2/v3
});
可以看到代码简洁了很多,而且自动处理每个回调中的错误。当然,这里只给出来最最简单的例子,在实际中,我们常会在每个step中执行一些操作,这时可写成:
var async = require('async')
async.series([
function(cb) { step1(function(err,v1) {
// do something with v1
cb(err, v1);
}),
function(cb) { step2(...) },
function(cb) { step3(...) }
], function(err, values) {
// do somethig with the err or values v1/v2/v3
});
该函数的详细解释为:
- 依次执行一个函数数组中的每个函数,每一个函数执行完成之后才能执行下一个函数。
- 如果任何一个函数向它的回调函数中传了一个error,则后面的函数都不会被执行,并且将会立刻会将该error以及已经执行了的函数的结果,传给series中最后那个callback。
- 当所有的函数执行完后(没有出错),则会把每个函数传给其回调函数的结果合并为一个数组,传给series最后的那个callback。
- 还可以json的形式来提供tasks。每一个属性都会被当作函数来执行,并且结果也会以json形式传给series最后的那个callback。这种方式可读性更高一些。
具体例子可参考:https://github.com/freewind/async_demo/blob/master/series.js
其代码中还包含了:
- 如果中间某个函数出错,series函数如何处理
- 如果某个函数传给回调的值为undefined, null, {}, []等,series如何处理
另外还需要注意的是:多个series调用之间是不分先后的,因为series本身也是异步调用。
2. parallel(tasks, [callback]) (多个函数并行执行)
并行执行多个函数,每个函数都是立即执行,不需要等待其它函数先执行。传给最终callback的数组中的数据按照tasks中声明的顺序,而不是执行完成的顺序。
如果某个函数出错,则立刻将err和已经执行完的函数的结果值传给parallel最终的callback。其它未执行完的函数的值不会传到最终数据,但要占个位置。
同时支持json形式的tasks,其最终callback的结果也为json形式。
示例代码:
async.parallel([
function(cb) { t.fire('a400', cb, 400) },
function(cb) { t.fire('a200', cb, 200) },
function(cb) { t.fire('a300', cb, 300) }
], function (err, results) {
log('1.1 err: ', err); // -> undefined
log('1.1 results: ', results); // ->[ 'a400', 'a200', 'a300' ]
});
中途出错的示例:
async.parallel([
function(cb) { log('1.2.1: ', 'start'); t.fire('a400', cb, 400) }, // 该函数的值不会传给最终callback,但要占个位置
function(cb) { log('1.2.2: ', 'start'); t.err('e200', cb, 200) },
function(cb) { log('1.2.3: ', 'start'); t.fire('a100', cb, 100) }
], function(err, results) {
log('1.2 err: ', err); // -> e200
log('1.2 results: ', results); // -> [ , undefined, 'a100' ]
});
以json形式传入tasks
async.parallel({
a: function(cb) { t.fire('a400', cb, 400) },
b: function(cb) { t.fire('c300', cb, 300) }
}, function(err, results) {
log('1.3 err: ', err); // -> undefined
log('1.3 results: ', results); // -> { b: 'c300', a: 'a400' }
});
更详细示例参见:https://github.com/freewind/async_demo/blob/master/parallel.js
3. waterfall(tasks, [callback]) (多个函数依次执行,且前一个的输出为后一个的输入)
与seires相似,按顺序依次执行多个函数。不同之处,每一个函数产生的值,都将传给下一个函数。如果中途出错,后面的函数将不会被执行。错误信息以及之前产生的结果,将传给waterfall最终的callback。
这个函数名为waterfall(瀑布),可以想像瀑布从上到下,中途冲过一层层突起的石头。注意,该函数不支持json格式的tasks。
async.waterfall([
function(cb) { log('1.1.1: ', 'start'); cb(null, 3); },
function(n, cb) { log('1.1.2: ',n); t.inc(n, cb); },
function(n, cb) { log('1.1.3: ',n); t.fire(n*n, cb); }
], function (err, result) {
log('1.1 err: ', err); // -> null
log('1.1 result: ', result); // -> 16
});
更详细示例参见:https://github.com/freewind/async_demo/blob/master/waterfall.js
4. auto(tasks, [callback]) (多个函数有依赖关系,有的并行执行,有的依次执行)
用来处理有依赖关系的多个任务的执行。比如某些任务之间彼此独立,可以并行执行;但某些任务依赖于其它某些任务,只能等那些任务完成后才能执行。
虽然我们可以使用async.parallel和async.series结合起来实现该功能,但如果任务之间关系复杂,则代码会相当复杂,以后如果想添加一个新任务,也会很麻烦。这时使用async.auto,则会事半功倍。
如果有任务中途出错,则会把该错误传给最终callback,所有任务(包括已经执行完的)产生的数据将被忽略。
这里假设我要写一个程序,它要完成以下几件事:
- 从某处取得数据
- 在硬盘上建立一个新的目录
- 将数据写入到目录下某文件
- 发送邮件,将文件以附件形式发送给其它人。
分析该任务,可以知道1与2可以并行执行,3需要等1和2完成,4要等3完成。
async.auto({
getData: function (callback) {
setTimeout(function(){
console.log('1.1: got data');
callback();
}, 300);
},
makeFolder: function (callback) {
setTimeout(function(){
console.log('1.1: made folder');
callback();
}, 200);
},
writeFile: ['getData', 'makeFolder', function(callback) {
setTimeout(function(){
console.log('1.1: wrote file');
callback(null, 'myfile');
}, 300);
}],
emailFiles: ['writeFile', function(callback, results) {
log('1.1: emailed file: ', results.writeFile); // -> myfile
callback(null, results.writeFile);
}]
}, function(err, results) {
log('1.1: err: ', err); // -> null
log('1.1: results: ', results); // -> { makeFolder: undefined,
// getData: undefined,
// writeFile: 'myfile',
// emailFiles: 'myfile' }
});
更多详细示例参见:https://github.com/freewind/async_demo/blob/master/auto.js
5. whilst(test, fn, callback)(用可于异步调用的while)
相当于while,但其中的异步调用将在完成后才会进行下一次循环。举例如下:
var count1 = 0;
async.whilst(
function() { return count1 < 3 },
function(cb) {
log('1.1 count: ', count1);
count1++;
setTimeout(cb, 1000);
},
function(err) {
// 3s have passed
log('1.1 err: ', err); // -> undefined
}
);
它相当于:
try {
whilst(test) {
fn();
}
callback();
} catch (err) {
callback(err);
}
该函数的功能比较简单,条件变量通常定义在外面,可供每个函数访问。在循环中,异步调用时产生的值实际上被丢弃了,因为最后那个callback只能传入错误信息。
另外,第二个函数fn需要能接受一个函数cb,这个cb最终必须被执行,用于表示出错或正常结束。
更详细示例参见:https://github.com/freewind/async_demo/blob/master/whilst_until.js
6. until(test, fn, callback) (与while相似,但判断条件相反)
var count4 = 0;
async.until(
function() { return count4>3 },
function(cb) {
log('1.4 count: ', count4);
count4++;
setTimeout(cb, 200);
},
function(err) {
// 4s have passed
log('1.4 err: ',err); // -> undefined
}
);
当第一个函数条件为false时,继续执行第二个函数,否则跳出。
7. queue (可设定worker数量的队列)
queue相当于一个加强版的parallel,主要是限制了worker数量,不再一次性全部执行。当worker数量不够用时,新加入的任务将会排队等候,直到有新的worker可用。
该函数有多个点可供回调,如worker用完时、无等候任务时、全部执行完时等。
定义一个queue,其worker数量为2,并在任务执行时,记录一下日志:
var q = async.queue(function(task, callback) {
log('worker is processing task: ', task.name);
task.run(callback);
}, 2);
worker数量将用完时,会调用saturated函数:
q.saturated = function() {
log('all workers to be used');
}
当最后一个任务交给worker执行时,会调用empty函数
q.empty = function() {
log('no more tasks wating');
}
当所有任务都执行完时,会调用drain函数
q.drain = function() {
console.log('all tasks have been processed');
}
放入多个任务,可一次放一个,或一次放多个
q.push({name:'t1', run: function(cb){
log('t1 is running, waiting tasks: ', q.length());
t.fire('t1', cb, 400); // 400ms后执行
}}, function(err) {
log('t1 executed');
});
q.push([{name:'t3', run: function(cb){
log('t3 is running, waiting tasks: ', q.length());
t.fire('t3', cb, 300); // 300ms后执行
}},{name:'t4', run: function(cb){
log('t4 is running, waiting tasks: ', q.length());
t.fire('t4', cb, 500); // 500ms后执行
}}], function(err) {
log('t3/4 executed');
});
更多详细示例参见:https://github.com/freewind/async_demo/blob/master/queue.js
8. iterator(tasks) (将几个函数包装为iterator)
将一组函数包装成为一个iterator,可通过next()得到以下一个函数为起点的新的iterator。该函数通常由async在内部使用,但如果需要时,也可在我们的代码中使用它。
var iter = async.iterator([
function() { console.log('111') },
function() { console.log('222') },
function() { console.log('333') }
]);
console.log(iter());
console.log(iter.next());
直接调用(),会执行当前函数,并返回一个由下个函数为起点的新的iterator。调用next(),不会执行当前函数,直接返回由下个函数为起点的新iterator。
对于同一个iterator,多次调用next(),不会影响自己。如果只剩下一个元素,调用next()会返回null。
更详细示例参见:https://github.com/freewind/async_demo/blob/master/iterator.js
9. apply(function, arguments..) (给函数预绑定参数)
apply是一个非常好用的函数,可以让我们给一个函数预绑定多个参数并生成一个可直接调用的新函数,简化代码。
对于函数:
function(callback) { t.inc(3, callback); }
可以用apply改写为:
async.apply(t.inc, 3);
还可以给某些函数预设值,得到一个新函数:
var log = async.apply(console.log, ">");
log('hello');
// > hello
更详细代码参见:https://github.com/freewind/async_demo/blob/master/apply.js
10. nextTick(callback) (在nodejs与浏览器两边行为一致)
nextTick的作用与nodejs的nextTick一样,都是把某个函数调用放在队列的尾部。但在浏览器端,只能使用setTimeout(callback,0),但这个方法有时候会让其它高优先级的任务插到前面去。
所以提供了这个nextTick,让同样的代码在服务器端和浏览器端表现一致。
var calls = [];
async.nextTick(function() {
calls.push('two');
});
calls.push('one');
async.nextTick(function() {
console.log(calls); // -> [ 'one', 'two' ]
});
更详细代码参见:https://github.com/freewind/async_demo/blob/master/nextTick.js
二:工具类
Async中提供了几个工具类,给我们提供一些小便利:
- memoize
- unmemoize
- log
- dir
- noConflict
1. memoize(fn, [hasher])
有一些方法比较耗时,且对于相同的输入总是有相同的输出。这时可以使用memoize给它加个缓存,对于相同的参数只计算一次,以后就直接从缓存中取结果用了。
比如这里有一个很慢的函数:
可以用memoize生成一个新的带缓存的函数:
试试同样参数调用两次:
注意memoize的参数中还有一个hasher,它是做什么用的呢?它可以让我们自定义如果根据参数来判断是否从缓存中取。默认情况下,两次调用,只有参数完全一样的时候才会从缓存中取。这里我们使用hasher来改变规则。
新定义的这个,将根据两个参数的和来判断。
第二次的调用,虽然参数跟第一次不一样,但是其和却一样,所以直接从缓存中拿到前次运行结果。
2. unmemoize(fn)
unmemoize的作用正好跟memoize相反,它可以把一个带缓存的函数再变回原样:
经过unmemoize后,再运行该函数就得重新运算了。
3. log(function, arguments)
log用于快速执行某异步函数,并记录它的返回值。试验函数时很方便,不用写那些固定模式的代码。
打印结果如下:
可以看到,它直接运行了该函数,并以每行一个参数的形式打印出了结果。
4. dir(function, arguments)
该函数与log非常像,不同之处在于,它最终调用了console.dir,而log最终调用了console.log。
看看使用dir打印的效果如何:
结果:
仅仅是多了几个单引号。为了弄清楚dir存在的意义(什么情况下应该使用dir而不是log),我提了一个问题,参看:http://stackoverflow.com/questions/10636866/whats-the-difference-between-async-log-and-async-dir
5. noConflict
最后是这个noConflict,它仅仅用于浏览器端,在nodejs中没用,这里无法演示。
它的作用是:如果之前已经在全局域中定义了async变量,当导入本async.js时,会先把之前的async变量保存起来,然后覆盖它。用完之后,调用noConflict()方法,就会归还该值。同时返回async本身供换名使用。
这里可以看一下它的实现代码:
可以看到,当处于nodejs或者commonjs环境中,它会执行module.exports=async,在其它情况下(通常为浏览器端)才会root.async=async,将async赋值给root。
在浏览器中的用法如下:
三:集合操作
Async提供了很多针对集合的函数,可以简化我们对集合进行异步操作时的步骤。如下:
- forEach:对集合中每个元素进行异步操作
- map:对集合中的每个元素通过异步操作得到另一个值,得到新的集合
- filter:对集合中元素使用异步操作进行筛选,得到符合条件的集合
- reject:与filter相似,只是判断条件时正好相反,得到剩下的元素的集合
- reduce:使用一个初始值同集合中每一个元素进行异步操作,最后得到一个唯一的结果
- detect:得到集合中满足条件的第一个数据
- sortBy:对集合中的数据进行异步操作,再根据值从小到大排序
- some/any:集合中是否有至少一个元素满足条件
- every/all:集合中是否每个元素都满足条件
- concat:对集合中的元素进行异步操作,将结果集合并成一个数组
下面一一解释:
1. forEach(arr, iterator(item, callback), callback(err))
如果想对同一个集合中的所有元素都执行同一个异步操作,可以利用forEach函数。注意该函数将重点放在“执行过程”上,忽略运行后产生的数据。如果需要结果,可使用map函数。
根据执行的方式不同,forEach提供了三个版本:
- 集合中所有元素并行执行
- 一个一个顺序执行
- 分批执行,同一批内并行,批与批之间按顺序
首先看并行执行的例子,它比较简单,只是打印出传入的元素内容:
它将打出如下结果:
最前面的数据是当前的时间值(秒.毫秒),从中可以看到各异步操作是并行执行的。
如果想同步执行,需要使用forEachSeries函数,它与forEach的用法一模一样,只是执行时是一个一个来的。这里就不给例子了。
当集合中元素很多,既不想一次全部并行操作,又不想一个一个按顺序来,可以使用forEachLimit函数。它可以设定一批处理几个,每一批内并行执行,批与批之间顺序执行。
打印结果如下:
可以看到前两个是同时开始的,而第三个是等前两个都完成以后才开始的。
更多详细示例:https://github.com/freewind/async_demo/blob/master/forEach.js
2. map(arr, iterator(item, callback), callback(err, results))
map的重点是转换,即把集合中的元素通过异步操作转为另一个对象,最后可以得到转换后的对象数组。它也提供了并行与顺序执行两种方式。
这里给一个示例,给集合中的每个元素以异步方式增加!!!:
打印结果如下:
可以看到,对各元素的操作是并行的,结果会汇总在一起交给最后的回调。
如果想顺序执行,可使用mapSeries,它与map的用法一模一样。
更多详细示例:https://github.com/freewind/async_demo/blob/master/map.js
3. filter(arr, iterator(item, callback(test)), callback(results))
使用异步操作对集合中的元素进行筛选。需要注意的是,iterator的callback只有一个参数,只能接收true或false。
对于出错,该函数没有做出任何处理,直接由nodejs抛出。所以需要注意对Error的处理。
提供了并行与顺序执行两种方式。
并行示例,找到所有>=3的元素:
打印结果如下:
可见找到了满足条件的所有元素。
如果需要顺序执行,可以使用filterSeries函数,它的用法与filter一样。
更多详细示例:https://github.com/freewind/async_demo/blob/master/filter_reject.js
4. reject(arr, iterator(item, callback(test)), callback(results))
reject与filter相似,只是行为正好相反。当条件为true时,它将丢弃相应的元素。它也提供了并行与顺序执行两种方式。
并行示例,去掉所有>=3的元素:
打印结果如下:
如果想顺序执行,可使用rejectSeries,它与reject用法一样。
更多详细示例:https://github.com/freewind/async_demo/blob/master/filter_reject.js
5. reduce(arr, memo, iterator(memo,item,callback), callback(err,result))
Reduce可以让我们给定一个初始值,用它与集合中的每一个元素做运算,最后得到一个值。reduce从左向右来遍历元素,如果想从右向左,可使用reduceRight。
这里给个例子,计算出100与某个集合中所有数之和:
将打印出结果:
需要注意的
是,async中的reduce,不是并行操作,而是对元素一个个顺序操作,所以当元素比较多时,性能会比较弱。如果想提高性能,可使用
async.map函数,先并行得到集合中每个元素被处理之后的值,然后再使用Array.prototype.reduce函数处理,性能会快很多。
对于这个例子:
它总耗时为0.62秒。如果换成map+array.reduce:
耗时为0.21秒。
更多详细示例:https://github.com/freewind/async_demo/blob/master/reduce.js
6. detect(array, iterator(item,callback(test)), callback(result)
用于取得集合中满足条件的第一个元素。它分为并行与顺序执行两种方式,分别对应函数detect和detectSeries。
并行示例,找到一个奇数:
结果如下:
可见得到了最先执行完的那个奇数3.
更多详细示例:https://github.com/freewind/async_demo/blob/master/detect.js
7. sortBy(array, iterator(item,callback(err,result)), callback(err,results))
对集合内的元素进行排序,依据每个元素进行某异步操作后产生的值,从小到大排序。
示例:
打印结果如下:
可以看到集合中的数据从小到大排好了序。
更多详细示例:https://github.com/freewind/async_demo/blob/master/sortBy.js
8. some/any(arr, iterator(item,callback(test)), callback(result))
当集合中是否有至少一个元素满足条件时,最终callback得到的值为true,否则为false。它有一个别名叫any。
判断集合中是否有元素小于等于3:
打印结果如下:
可见的确得到了结果true。
更多详细示例:https://github.com/freewind/async_demo/blob/master/some.js
9. every/all(arr, iterator(item,callback), callback(result))
如果集合里每一个元素都满足条件,则传给最终回调的result为true,否则为false
在下面的示例中,因为集合中每个元素都<=10,所以最终结果为true
打印如下:
可见最终结果为true
更多详细示例:https://github.com/freewind/async_demo/blob/master/every.js
10. concat(arr, iterator(item,callback(err,result)), callback(err,result))
将合并多个异步操作的结果合并为一个数组。
在下面的示例中,将集合中的每一个元素都加倍:
打印如下:
打印出来的是经过合并后的数组。
更多详细示例:https://github.com/freewind/async_demo/blob/master/concat.js