我有一个用 Nodejs 编写的系统,它首先必须将非常大的 csv 文件中的记录导入到数据库中。使用 Sequelize 作为我的 ORM,我创建了一个简单的模型,如下所示:
"use strict";
const Sequelize = require('sequelize');
const sequelize = new Sequelize('mm', 'root', 'password', {
host: 'localhost',
dialect: 'mysql',
logging: true,
pool: {max: 5, min: 0, idle: 100000},
});
const Index = sequelize.define('index', {
value: {type: Sequelize.FLOAT}
});
然后我编写了以下代码来循环文件中的行,解释这些行,并将它们写入数据库:
let readline = require('readline');
let moment = require('moment');
let lineReader = readline.createInterface({
input: require('fs').createReadStream('files/price_index.csv')
});
lineReader.on('line', function (line) {
let splitted = line.split(',');
let dt = moment(parseInt(splitted[0]));
let value = parseFloat(splitted[1]);
console.log(dt.format(), value);
Index.create({value: value, createdAt: dt});
});
这工作正常,但它在每 3120 条记录后暂停约 3 秒。我尝试了 sqlite 和 mysql,但它总是在恰好 3120 条记录后暂停。
看到 Sequelize 也在那些 3120 条记录之后开始记录插入查询,我认为这种行为的原因是某种缓存机制,它将所有查询放在一个队列中,直到它无事可做,或者它击中了那个魔法查询缓存限制正好是 3120 条记录。
我尝试在 Sequelize 的初始化中增加
pool.max
编号,但这似乎没有任何区别。任何人都可以确认我的缓存想法,或者向我解释这种行为的真正原因是什么?我可以以某种方式改变这种行为以使其具有一致的吞吐量吗?欢迎所有提示!
最佳答案
我认为 3120 行将是 the high water mark for the createReadStream
buffer which is 64KiB 。当缓冲区已满时, Node 将停止读取。
看起来 3120 个 line
事件都在同一个 Node 事件滴答上运行,因此您可以处理 3120 行,并为下一个滴答安排了 3120 个异步 Index.create
调用。所以你最终要在每一边做大量的处理。要么读取和调度查询,要么处理大量调度查询。
当 3120 line
事件函数完成时,一些垃圾收集发生,并且 3120 sequelize create
被安排的调用有机会做他们的事情。这是数据中的“暂停”,但 Node 仍在处理。所有 create
调用都需要几秒钟才能完成,然后再进行一些垃圾收集并返回到下一个 csv 数据块以及所有 line
事件。这个过程就这样来回循环。
在一个有 10000 行的 csv 文件中,我看到在所有 10000 行 csv 数据被读取并安排插入之前能够运行大约 3 个查询。
一致的吞吐量
您可能想要使用具有较小块的 Readable Stream。然后基于 sequelize 插入完成块读取。您可能需要对自己进行处理,而不是使用 readline
。如果 csv 文件适合内存,只需读取整个内容,因为调度会更容易。
也许使用类似 queue
的东西来管理插入,允许你的 Sequelize 池 max
作为 concurrency
。然后一旦队列的 length
足够低,允许再次读取。
我不知道最终结果是否会更快,但最终可能会非常相似。
关于mysql - 为什么 Sequelize 在 3120 条记录后暂停?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47052410/