我在MongoDB中收集了大量资料。想要通过在数据上运行一些业务逻辑nodejs脚本将所有数据迁移到cassandra。做这个的最好方式是什么 ?
我制作了一个脚本,在这个脚本中,我从mongo的单个请求中获得了5000个文档,并处理数据并将文档插入cassandra。经过40-50次迭代后,这需要花费大量时间。 CPU使用率显示为100%。这是因为发生了很多回调吗?我是Node js的新手,所以无法得出任何结论。`var cassandra = require('../ models / tracking_cassandra');
var TrackingEvents = require('../ models / tracking_mongo_events');
var counter = 0;
var incr = 5000;
var final_counter = 0;
var start_point = function (callback){
TrackingEvents.count(function(err, data){
final_counter = data;
TrackingEvents.getEventsByCounter(counter, function(counter, obj) {
var prevId = obj[0].toObject()._id;
getMessagesFromMongo(prevId, callback);
});
});
};
function getMessagesFromMongo(prevId, callback){
counter = counter + incr;
TrackingEvents.getEventsByCounter(counter, function(counter, obj) {
var nextId = obj[0].toObject()._id;
var start_time = new Date();
TrackingEvents.getEventsBtwIds(prevId, nextId, function ( err, userEvents ) {
if(userEvents.length !== 0){
insert_into_cassandra( userEvents, callback );
}else{
console.log('empty data set');
}
});
if(counter >= final_counter){
callback();
}else{
getMessagesFromMongo(nextId, callback);
}
});
};
var insert_into_cassandra = function( events, callback ){
var inserts = 0;
total_documents = total_documents + events.length;
for(var i = 0 ; i< events.length ; i++){
var userEventData = events[i].toObject();
if(typeof userEventData.uid == 'undefined'){
total_nuid ++;
}else{
create_cassandra_query( userEventData );
}
}
};
var create_cassandra_query = function ( eventData ) {
delete eventData._id;
delete eventData[0];
delete eventData.appid;
delete eventData.appversion;
var query = "INSERT INTO userwise_events ";
var keys = "(";
var values = "(";
for(var key in eventData){
if(eventData[key] == null || typeof eventData[key] == 'undefined'){
delete eventData[key];
}
if (eventData.hasOwnProperty(key)) {
keys = keys + key + ', ';
values = values + ':' + key + ', ';
}
if(key != 'uid' && key!= 'date_time' && key != 'etypeId'){
eventData[key] = String(eventData[key]);
}
}
keys = keys.slice(0, -2);
values = values.slice(0, -2);
keys = keys + ")";
values = values + ")";
query = query + keys + " VALUES " + values;
cassandra.trackingCassandraClient.execute(query, eventData, { prepare: true }, function (err, data) {
if(err){
console.log(err);
}
});
};
var start_time = new Date();
start_point(function(res, err){
var end_time = new Date();
var diff = end_time.getTime() - start_time.getTime();
var seconds_diff = diff / 1000;
var totalSec = Math.abs(seconds_diff);
console.log('Total Execution Time : ' + totalSec);
});
process.on('uncaughtException', function (err) {
console.log('Caught exception: ' + err);
});`
最佳答案
这是因为发生了很多回调吗?
据我所知,可能根本没有回调-无法告诉您代码的问题是什么,甚至没有一行代码也没有。
对于这样一个模糊的问题,我只能给您一个一般性的建议:确保您没有长时间运行for
或while
循环。而且,除了在事件循环的第一个滴答声之外,请勿在其他任何地方使用阻塞系统调用。如果您不知道事件循环的第一个滴答声是什么,那么根本不要使用阻塞调用。只要有可能,就使用流存储数据-尤其是在有大量数据的情况下。
100%的CPU利用率是一个不好的信号,对于I / O繁重的操作(如您要尝试执行的操作),绝不应该发生。您应该能够轻松处理大量数据,尤其是在使用流时。确保您的进程最大程度地利用CPU进行固有的I / O绑定操作,例如通过网络移动大量数据,这肯定表明您在代码中做错了什么。那到底是什么这将是一个谜,因为您甚至没有向我们展示代码的哪一行。
关于node.js - 使用Node.js脚本将GB的数据从MongoDB迁移到Cassandra的最佳方法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42790995/