我正在使用Node.js-异步和请求模块来抓取100多个百万个网站,几分钟后,我不断碰到ESOCKETTIMEDOUTETIMEDOUT错误。

重新启动脚本后,它可以再次工作。这似乎不是连接限制问题,因为我仍然可以毫不延迟地执行resolve4,resolveNs,resolveMx以及curl

您看到代码有什么问题吗?或任何建议?我想将async.queue()并发性提高到至少1000。谢谢。

var request = require('request'),
    async = require('async'),
    mysql = require('mysql'),
    dns = require('dns'),
    url = require('url'),
    cheerio = require('cheerio'),
    iconv = require('iconv-lite'),
    charset = require('charset'),
    config = require('./spy.config'),
    pool = mysql.createPool(config.db);

iconv.skipDecodeWarning = true;

var queue = async.queue(function (task, cb) {
    dns.resolve4('www.' + task.domain, function (err, addresses) {
        if (err) {
            //
            // Do something
            //
            setImmediate(function () {
                cb()
            });
        } else {
            request({
                url: 'http://www.' + task.domain,
                method: 'GET',
                encoding:       'binary',
                followRedirect: true,
                pool:           false,
                pool:           { maxSockets: 1000 },
                timeout:        15000 // 15 sec
            }, function (error, response, body) {

                //console.info(task);

                if (!error) {
                  // If ok, do something

                } else {
                    // If not ok, do these

                    console.log(error);

                    // It keeps erroring here after few minutes, resolve4, resolveNs, resolveMx still work here.

                    // { [Error: ETIMEDOUT] code: 'ETIMEDOUT' }
                    // { [Error: ESOCKETTIMEDOUT] code: 'ESOCKETTIMEDOUT' }

                    var ns = [],
                        ip = [],
                        mx = [];
                    async.parallel([
                        function (callback) {
                            // Resolves the domain's name server records
                            dns.resolveNs(task.domain, function (err, addresses) {
                                if (!err) {
                                    ns = addresses;
                                }
                                callback();
                            });
                        }, function (callback) {
                            // Resolves the domain's IPV4 addresses
                            dns.resolve4(task.domain, function (err, addresses) {
                                if (!err) {
                                    ip = addresses;
                                }
                                callback();
                            });
                        }, function (callback) {
                            // Resolves the domain's MX records
                            dns.resolveMx(task.domain, function (err, addresses) {
                                if (!err) {
                                    addresses.forEach(function (a) {
                                        mx.push(a.exchange);
                                    });
                                }
                                callback();
                            });
                        }
                    ], function (err) {
                        if (err) return next(err);

                        // do something
                    });

                }
                setImmediate(function () {
                    cb()
                });
            });
        }
    });
}, 200);

// When the queue is emptied we want to check if we're done
queue.drain = function () {
    setImmediate(function () {
        checkDone()
    });
};
function consoleLog(msg) {
    //console.info(msg);
}
function checkDone() {
    if (queue.length() == 0) {
        setImmediate(function () {
            crawlQueue()
        });
    } else {
        console.log("checkDone() not zero");
    }
}

function query(sql) {
    pool.getConnection(function (err, connection) {
        if (!err) {
            //console.log(sql);
            connection.query(sql, function (err, results) {
                connection.release();
            });
        }
    });
}

function crawlQueue() {
    pool.getConnection(function (err, connection) {
        if (!err) {
            var sql = "SELECT * FROM domain last_update < (UNIX_TIMESTAMP() - 2592000) LIMIT 500";
            connection.query(sql, function (err, results) {
                if (!err) {
                    if (results.length) {
                        for (var i = 0, len = results.length; i < len; ++i) {
                            queue.push({"id": results[i]['id'], "domain": results[i]['domain'] });
                        }
                    } else {
                        process.exit();
                    }
                    connection.release();
                } else {
                    connection.release();
                    setImmediate(function () {
                        crawlQueue()
                    });
                }
            });
        } else {
            setImmediate(function () {
                crawlQueue()
            });
        }
    });
}
setImmediate(function () {
    crawlQueue()
});

而且系统限制很高。
    Limit                     Soft Limit           Hard Limit           Units
    Max cpu time              unlimited            unlimited            seconds
    Max file size             unlimited            unlimited            bytes
    Max data size             unlimited            unlimited            bytes
    Max stack size            8388608              unlimited            bytes
    Max core file size        0                    unlimited            bytes
    Max resident set          unlimited            unlimited            bytes
    Max processes             257645               257645               processes
    Max open files            500000               500000               files
    Max locked memory         65536                65536                bytes
    Max address space         unlimited            unlimited            bytes
    Max file locks            unlimited            unlimited            locks
    Max pending signals       257645               257645               signals
    Max msgqueue size         819200               819200               bytes
    Max nice priority         0                    0
    Max realtime priority     0                    0
    Max realtime timeout      unlimited            unlimited            us

系统控制
net.ipv4.ip_local_port_range = 10000    61000

最佳答案

默认情况下,Node具有4 workers to resolve DNS queries。如果您的DNS查询花费很长时间,则请求将在DNS阶段被阻止,并且症状恰好是ESOCKETTIMEDOUTETIMEDOUT

尝试增加您的uv线程池大小:

export UV_THREADPOOL_SIZE=128
node ...

index.js(或您的入口点所在的位置)中:
#!/usr/bin/env node
process.env.UV_THREADPOOL_SIZE = 128;

function main() {
   ...
}

编辑:I also wrote blog post关于它。

09-04 00:29