我必须在NodeJS中解析一个非常大的CSV文件,并将其保存在一个数据库中(异步操作),一次最多可以输入500个条目。由于内存限制,我必须流式传输CSV文件,并想使用PapaParse解析CSV文件(在我的情况下效果最好)。

由于PapaParse使用回调样式的方法来解析Node.js流,所以我看不到将高地(用于批处理和数据转换)和PapaParse结合起来的简便方法。因此,我尝试使用ParseThrough流向其中写入数据并以高地读取该流以进行批处理:

const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');

const passThroughStream = new PassThrough({ objectMode: true });

csv.parse(fileStream, {
  step: function(row) {
    // Write data to stream
    passThroughStream.write(row.data[0]);
  },
  complete: function() {
    // Somehow "end" the stream
    passThroughStream.write(null);
  },
});

highland(passThroughStream)
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });


显然,这不能按原样工作,并且实际上什么也没做。有没有可能这样的事情,甚至是更好的方法来解析很大的CSV文件并将行保存到数据库中(最多500个批处理)?

编辑:使用csv包(https://www.npmjs.com/package/csv),可能像这样(与fast-csv相同):

highland(fileStream.pipe(csv.parse()))
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });


但是不幸的是,两个NPM软件包在所有情况下都无法正确解析CSV文件。

最佳答案

快速浏览papaparse之后,我决定在scramjet中实现CSV解析器。

fileStream.pipe(new scramjet.StringStream('utf-8'))
    .csvParse(options)
    .batch(500)
    .map(items => db.insertArray('some_table', items))


希望对您有用。 :)

关于node.js - 爸爸和高地,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48632646/

10-11 22:36
查看更多