创建一个非常简单的Node.js实用程序来单独处理文本文件中的每条记录(逐行),但是由于Node固有的异步世界,很难处理以下情况:


打开与数据库的连接
读取文本文件的每一行
根据行中已处理文本的条件,在数据库中查找记录
阅读完文本文件后,关闭
数据库连接


我面临的挑战是,逐行读取文本文件(使用“ readline”模块),将侦听器附加到模块发出的“ line”事件。文件的所有行都被快速处理,并且对数据库的查询排队。我尝试了许多方法来创建一个同步过程,但无济于事。这是我的最新尝试,肯定充满了异步/等待功能。作为一个长期的开发人员,但是对Node.js还是陌生的,我知道我缺少一些简单的东西。任何指导将不胜感激。

const { Pool, Client } = require('pg')

const client = new Client({
  user: '*****',
  host: '****',
  database: '*****',
  password: '******#',
  port: 5432,
})


client.connect()
  .then(() => {

    console.log("Connected");

    console.log("Processing file");

    const fs = require('fs');
    const readline = require('readline');
    const instream = fs.createReadStream("input.txt");
    const outstream = new (require('stream'))();
    const rl = readline.createInterface(instream, outstream);

    rl.on('line', async function (line) {

        var callResult;

        if (line.length > 0) {

            var words = line.replace(/[^0-9a-z ]/gi, '').split(" ");
            var len = words.length;

            for (var i = 0; i < words.length; i++) {
                if (words[i].length === 0) {
                  words.splice(i, 1);
                  i--;
                } else {
                    words[i] = words[i].toLowerCase();
                }
              }

            for (var i = 0; i < words.length; i++) {

                if (i <= words.length - 3) {

                    callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim() + " " + words[i + 2].trim());

                    if (!callResult) {

                        callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());

                        if (!callResult) {

                            callResult = await isKeyPhrase(words[i].trim());
                        }
                    };

                } else if (i <= words.length - 2) {

                    callResult = await isKeyPhrase(words[i].trim() + " " + words[i + 1].trim());

                    if (!callResult ) {

                        callResult = await isKeyPhrase(words[i].trim());

                    };

                } else if (i < words.length) {

                    callResult = await isKeyPhrase(words[i].trim());
                }
            }

        }       // (line.length > 0)

    });

    rl.on('close', function (line) {
        console.log('done reading file.');

        // stubbed out because queries are still running
        //client.end();

    });


  }).catch( (err) => {
    console.error('connection error', err.stack);
});

async function isKeyPhrase(keyPhraseText) {

    var callResult = false;

    return new Promise(async function(resolve, reject) {

        const query = {
          name: 'get-name',
          text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
          values: [keyPhraseText],
          rowMode: 'array'
        }

        // promise
        await client.query(query)
          .then(result => {

            if (result.rowCount == 1) {

                console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);

                calResult = true;

            }

          }).catch(e => {

            console.error(e.stack)
            console.log(e.stack);
            reject(e);

        });

        resolve(callResult);

    });

}

最佳答案

欢迎使用StackOverflow。 :)

确实,在尝试每行与数据库交互数据时,没有(明智的)同步读取文件的方法。如果文件大于内存的1/8,则没有可行的方法。

这并不意味着没有办法或为此编写合理的代码。唯一的问题是标准节点流(包括readline)不等待异步代码。

我建议使用scramjet,这是一个功能性的流编程框架,非常适合您的用例设计(disclamer:我是作者)。代码如下所示:

const { Pool, Client } = require('pg')
const { StringStream } = require("scramjet");

const client = new Client({
    user: '*****',
    host: '****',
    database: '*****',
    password: '******#',
    port: 5432,
})

client.connect()
    .then(async () => {
        console.log("Connected, processing file");


        return StringStream
            // this creates a "scramjet" stream from input.
            .from(fs.createReadStream("input.txt"))
            // this splits fs line by line
            .lines()
            // the next line is just to show when the file is fully read
            .use(stream => stream.whenEnd.then(() => console.log("done reading file.")))
            // this splits the words like the first "for" loop in your code
            .map(line => line.toLowerCase().replace(/[^0-9a-z ]+/g, '').split(" "))
            // this one gets rid of empty lines (i.e. no words)
            .filter(line => line.length > 0)
            // this splits the words like the first "for" loop in your code
            .map(async words => {
                for (var i = 0; i < words.length; i++) {
                    const callResult = await isKeyPhrase(words.slice(i, i + 3).join(" "));
                    if (callResult) return callResult;
                }
            })
            // this runs the above list of operations to the end and returns a promise.
            .run();
    })
    .then(() => {
        console.log("done processing file.");
        client.end();
    })
    .catch((e) => {
        console.error(e.stack);
    });


async function isKeyPhrase(keyPhraseText) {

    const query = {
        name: 'get-name',
        text: 'select KP.EntryID from KeyPhrase KP where (KP.KeyPhraseText = $1) and (Active = true)',
        values: [keyPhraseText],
        rowMode: 'array'
    };

    const result = await client.query(query);

    if (result.rowCount > 0) {
        console.log(`Key phrase '${keyPhraseText}' found in table with Phase ID = ${result.rows}`);
        return true;
    }

    return false;
}


我在某些地方压缩和优化了您的代码,但是总的来说,这应该可以为您提供所需的信息-scramjet为每个操作添加异步模式,并且将等待所有操作结束。

10-07 19:35
查看更多