我试图确保一个mysql查询导致另一个查询,并且直到它的所有子查询都完成才完成。因此,例如,我从一个选择开始并流式处理行,然后从该行结果中执行后续查询。这对于回调是可行的,但是我最终会用完内存,因此我想减慢进程并运行批处理,但是由于调度的异步特性,我无法保持同步并结束处理完所有行后的连接。
这是一个例子:
var query = conn.query('select id from table1 limit 10');
query.on('result', function(row){
console.log('query1', row);
var query2 = conn.query('select id from books where id = ? ', [row.id]);
query2.on('result', function(row2){
console.log('query2', row2);
var query3 = conn.query('insert into test (id) values (?)', [row2.id]);
query3.on('result', function(row3){
console.log(row3);
});
});
});
query.on('end', function(){
conn.end();
});
上面的操作失败,因为在初始查询结束后,query3中仍有行要处理。
有什么想法吗?实际的代码更加复杂,因为我必须处理后续查询中的xml,并在我遍历批处理时触发更多的插入。
谢谢!
最佳答案
我建议使用async模块此解决方案:
var async = require("async");
// connection instance
var conn;
// here goes task serving logic
// if any async function should be finished before drain callback, push them into q
var solvers = {
query: function(q, task, row){
console.log('query1', row);
q.push({
solver: "query2",
req: "select id from books where id = ?",
reqArgs: [row.id]
});
},
query2: function(q, task, row){
console.log('query2', row);
q.push({
solver: "query3",
req: "insert into test (id) values (?)",
reqArgs: [row.id]
});
},
query3: function(q, task, row){
console.log(row);
}
}
// here is a queue of tasks
var q = async.queue(function(task, cb){
var query = conn.query(task.req, task.reqArgs);
query.on("end", cb);
query.on("result",function(row){
solvers[task.solver](q, task, row);
});
}, 2); // limit of parallel queries
// when every request has reached "end"
q.drain = function(){
conn.end();
// continue from here
};
// initial task
q.push({
solver: "query",
req: "select id from table1 limit 10",
reqArgs: []
});
但是,我不确定按ID发出请求ID是否是一个好的解决方案。
也许,我只是不知道一个完整的问题。