创建一个非常简单的Node.js实用程序来单独处理文本文件中的每条记录(逐行),但是由于Node固有的异步世界,很难处理以下情况:
打开与数据库的连接
读取文本文件的每一行
根据行中已处理文本的条件,在数据库中查找记录
阅读完文本文件后,关闭
数据库连接
我面临的挑战是,逐行读取文本文件(使用“ readline”模块),将侦听器附加到模块发出的“ line”事件。文件的所有行都被快速处理,并且对数据库的查询排队。我尝试了许多方法来创建一个同步过程,但无济于事。这是我的最新尝试,肯定充满了异步/等待功能。作为一个长期的开发人员,但是对Node.js还是陌生的,我知道我缺少一些简单的东西。任何指导将不胜感激。
const { Pool, Client } = require('pg')
const client = new Client({
user: '*****',
host: '****',
database: '*****',
password: '******#',
port: 5432,
})
client.connect()
.then(() => {
console.log("Connected");
console.log("Processing file");
const fs = require('fs');
const readline = require('readline');
const instream = fs.createReadStream("input.txt");
const outstream = new (require('stream'))();
const rl = readline.createInterface(instream, outstream);
rl.on('line', async function (line) {
var callResult;
if (line.length > 0) {
var words = line.replace(/[^0-9a-z ]/gi, '').split(" ");
var len = words.length;
for (var i = 0; i < words.length; i++) {
if (words[i].length === 0) {
words.splice(i, 1);
i--;
} else {
words[i] = words[i].toLowerCase();
}
}
for (var i = 0; i < words.length; i++) {
if (i <= words.length - 3) {
callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim() + " " + words[i + 2].trim());
if (!callResult) {
callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());
if (!callResult) {
callResult = await isKeyPhrase(words[i].trim());
}
};
} else if (i <= words.length - 2) {
callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());
if (!callResult ) {
callResult = await isKeyPhrase(words[i].trim());
};
} else if (i < words.length) {
callResult = await isKeyPhrase(words[i].trim());
}
}
} // (line.length > 0)
});
rl.on('close', function (line) {
console.log('done reading file.');
// stubbed out because queries are still running
//client.end();
});
}).catch( (err) => {
console.error('connection error', err.stack);
});
async function isKeyPhrase(keyPhraseText) {
var callResult = false;
return new Promise(async function(resolve, reject) {
const query = {
name: 'get-name',
text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
values: [keyPhraseText],
rowMode: 'array'
}
// promise
await client.query(query)
.then(result => {
if (result.rowCount == 1) {
console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
calResult = true;
}
}).catch(e => {
console.error(e.stack)
console.log(e.stack);
reject(e);
});
resolve(callResult);
});
}
最佳答案
欢迎使用StackOverflow。 :)
确实,在尝试每行与数据库交互数据时,没有(明智的)同步读取文件的方法。如果文件大于内存的1/8,则没有可行的方法。
这并不意味着没有办法或为此编写合理的代码。唯一的问题是标准节点流(包括readline
)不等待异步代码。
我建议使用scramjet
,这是一个功能性的流编程框架,非常适合您的用例设计(disclamer:我是作者)。代码如下所示:
const { Pool, Client } = require('pg')
const { StringStream } = require("scramjet");
const client = new Client({
user: '*****',
host: '****',
database: '*****',
password: '******#',
port: 5432,
})
client.connect()
.then(async () => {
console.log("Connected, processing file");
return StringStream
// this creates a "scramjet" stream from input.
.from(fs.createReadStream("input.txt"))
// this splits fs line by line
.lines()
// the next line is just to show when the file is fully read
.use(stream => stream.whenEnd.then(() => console.log("done reading file.")))
// this splits the words like the first "for" loop in your code
.map(line => line.toLowerCase().replace(/[^0-9a-z ]+/g, '').split(" "))
// this one gets rid of empty lines (i.e. no words)
.filter(line => line.length > 0)
// this splits the words like the first "for" loop in your code
.map(async words => {
for (var i = 0; i < words.length; i++) {
const callResult = await isKeyPhrase(words.slice(i, i + 3).join(" "));
if (callResult) return callResult;
}
})
// this runs the above list of operations to the end and returns a promise.
.run();
})
.then(() => {
console.log("done processing file.");
client.end();
})
.catch((e) => {
console.error(e.stack);
});
async function isKeyPhrase(keyPhraseText) {
const query = {
name: 'get-name',
text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
values: [keyPhraseText],
rowMode: 'array'
};
const result = await client.query(query);
if (result.rowCount > 0) {
console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
return true;
}
return false;
}
我在某些地方压缩和优化了您的代码,但是总的来说,这应该可以为您提供所需的信息-
scramjet
为每个操作添加异步模式,并且将等待所有操作结束。