我有一个用 Nodejs 编写的系统,它首先必须将非常大的 csv 文件中的记录导入到数据库中。使用 Sequelize 作为我的 ORM,我创建了一个简单的模型,如下所示:

"use strict";
const Sequelize = require('sequelize');
const sequelize = new Sequelize('mm', 'root', 'password', {
    host: 'localhost',
    dialect: 'mysql',
    logging: true,
    pool: {max: 5, min: 0, idle: 100000},
});
const Index = sequelize.define('index', {
    value: {type: Sequelize.FLOAT}
});

然后我编写了以下代码来循环文件中的行,解释这些行,并将它们写入数据库:
let readline = require('readline');
let moment = require('moment');

let lineReader = readline.createInterface({
    input: require('fs').createReadStream('files/price_index.csv')
});

lineReader.on('line', function (line) {
    let splitted = line.split(',');
    let dt = moment(parseInt(splitted[0]));
    let value = parseFloat(splitted[1]);
    console.log(dt.format(), value);
    Index.create({value: value, createdAt: dt});
});

这工作正常,但它在每 3120 条记录后暂停约 3 秒。我尝试了 sqlite 和 mysql,但它总是在恰好 3120 条记录后暂停。

看到 Sequelize 也在那些 3120 条记录之后开始记录插入查询,我认为这种行为的原因是某种缓存机制,它将所有查询放在一个队列中,直到它无事可做,或者它击中了那个魔法查询缓存限制正好是 3120 条记录。

我尝试在 Sequelize 的初始化中增加 pool.max 编号,但这似乎没有任何区别。

任何人都可以确认我的缓存想法,或者向我解释这种行为的真正原因是什么?我可以以某种方式改变这种行为以使其具有一致的吞吐量吗?欢迎所有提示!

最佳答案

我认为 3120 行将是 the high water mark for the createReadStream buffer which is 64KiB 。当缓冲区已满时, Node 将停止读取。

看起来 3120 个 line 事件都在同一个 Node 事件滴答上运行,因此您可以处理 3120 行,并为下一个滴答安排了 3120 个异步 Index.create 调用。所以你最终要在每一边做大量的处理。要么读取和调度查询,要么处理大量调度查询。

当 3120 line 事件函数完成时,一些垃圾收集发生,并且 3120 sequelize create 被安排的调用有机会做他们的事情。这是数据中的“暂停”,但 Node 仍在处理。所有 create 调用都需要几秒钟才能完成,然后再进行一些垃圾收集并返回到下一个 csv 数据块以及所有 line 事件。这个过程就这样来回循环。

在一个有 10000 行的 csv 文件中,我看到在所有 10000 行 csv 数据被读取并安排插入之前能够运行大约 3 个查询。

一致的吞吐量

您可能想要使用具有较小块的 Readable Stream。然后基于 sequelize 插入完成块读取。您可能需要对自己进行处理,而不是使用 readline 。如果 csv 文件适合内存,只需读取整个内容,因为调度会更容易。

也许使用类似 queue 的东西来管理插入,允许你的 Sequelize 池 max 作为 concurrency 。然后一旦队列的 length 足够低,允许再次读取。

我不知道最终结果是否会更快,但最终可能会非常相似。

关于mysql - 为什么 Sequelize 在 3120 条记录后暂停?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47052410/

10-09 20:27