我在MongoDB中收集了大量资料。想要通过在数据上运行一些业务逻辑nodejs脚本将所有数据迁移到cassandra。做这个的最好方式是什么 ?

我制作了一个脚本,在这个脚本中,我从mongo的单个请求中获得了5000个文档,并处理数据并将文档插入cassandra。经过40-50次迭代后,这需要花费大量时间。 CPU使用率显示为100%。这是因为发生了很多回调吗?我是Node js的新手,所以无法得出任何结论。`var cassandra = require('../ models / tracking_cassandra');
    var TrackingEvents = require('../ models / tracking_mongo_events');

var counter = 0;
var incr = 5000;
var final_counter = 0;

var start_point = function (callback){
    TrackingEvents.count(function(err, data){
        final_counter = data;
        TrackingEvents.getEventsByCounter(counter, function(counter, obj) {
            var prevId = obj[0].toObject()._id;
            getMessagesFromMongo(prevId, callback);
        });
    });
};

function getMessagesFromMongo(prevId, callback){
    counter = counter + incr;
    TrackingEvents.getEventsByCounter(counter, function(counter, obj) {
        var nextId = obj[0].toObject()._id;
        var start_time = new Date();
        TrackingEvents.getEventsBtwIds(prevId, nextId, function ( err, userEvents ) {
            if(userEvents.length !== 0){
                insert_into_cassandra( userEvents, callback );
            }else{
                console.log('empty data set');
            }
        });
        if(counter >= final_counter){
            callback();
        }else{
            getMessagesFromMongo(nextId, callback);
        }
    });
};

var insert_into_cassandra = function( events, callback ){
    var inserts = 0;
    total_documents = total_documents + events.length;
    for(var i = 0 ; i< events.length ; i++){
        var userEventData = events[i].toObject();
        if(typeof userEventData.uid == 'undefined'){
            total_nuid ++;
        }else{
            create_cassandra_query( userEventData );
        }
    }
};

var create_cassandra_query = function ( eventData ) {
    delete eventData._id;
    delete eventData[0];
    delete eventData.appid;
    delete eventData.appversion;
    var query = "INSERT INTO userwise_events ";
    var keys = "(";
    var values = "(";
    for(var key in eventData){
        if(eventData[key] == null || typeof eventData[key] == 'undefined'){
            delete eventData[key];
        }
        if (eventData.hasOwnProperty(key)) {
            keys = keys + key + ', ';
            values = values + ':' + key + ', ';
        }
        if(key != 'uid' && key!= 'date_time' && key != 'etypeId'){
            eventData[key] = String(eventData[key]);
        }
    }
    keys = keys.slice(0, -2);
    values = values.slice(0, -2);
    keys = keys + ")";
    values = values + ")";
    query = query + keys + " VALUES " + values;
    cassandra.trackingCassandraClient.execute(query, eventData, { prepare: true }, function (err, data) {
        if(err){
            console.log(err);
        }
    });
};

var start_time = new Date();
start_point(function(res, err){
    var end_time = new Date();
    var diff = end_time.getTime() - start_time.getTime();
    var seconds_diff = diff / 1000;
    var totalSec = Math.abs(seconds_diff);
    console.log('Total Execution Time : ' + totalSec);
});

process.on('uncaughtException', function (err) {
    console.log('Caught exception: ' + err);
});`

最佳答案

这是因为发生了很多回调吗?


据我所知,可能根本没有回调-无法告诉您代码的问题是什么,甚至没有一行代码也没有。

对于这样一个模糊的问题,我只能给您一个一般性的建议:确保您没有长时间运行forwhile循环。而且,除了在事件循环的第一个滴答声之外,请勿在其他任何地方使用阻塞系统调用。如果您不知道事件循环的第一个滴答声是什么,那么根本不要使用阻塞调用。只要有可能,就使用流存储数据-尤其是在有大量数据的情况下。

100%的CPU利用率是一个不好的信号,对于I / O繁重的操作(如您要尝试执行的操作),绝不应该发生。您应该能够轻松处理大量数据,尤其是在使用流时。确保您的进程最大程度地利用CPU进行固有的I / O绑定操作,例如通过网络移动大量数据,这肯定表明您在代码中做错了什么。那到底是什么这将是一个谜,因为您甚至没有向我们展示代码的哪一行。

关于node.js - 使用Node.js脚本将GB的数据从MongoDB迁移到Cassandra的最佳方法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42790995/

10-12 16:17
查看更多