本文介绍了如何使用NodeJS消费来自Kafka-Consumer的最新消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我创建了一个NodeJS应用程序来将数据插入到MongoDB集合中。此数据库插入是通过使用Kafka完成的。Kafka-node是我用来调用Kafka的插件。
我可以创建主题并在生产者级别向消费者发送消息。消息和主题取自POST请求。
This is how I call the Kafka. Parameters are topic and message.
每次我调用此接口时,生产者都会创建一条新消息并将其发送给消费者。在每次调用中,以前的所有消息都将返回给消费者。
我已使用fromOffset: 'earliest'
和fromOffset: 'latest'
选项来限制以前的消息,但不起作用。
有人能给我提个建议吗?Kafka-node版本
"kafka-node": "^5.0.0",
我使用的代码
var kafka = require('kafka-node');
const {MongoClient} = require('mongodb');
var url = 'mongodb://127.0.0.1:27017/';
const mongoClient = new MongoClient(url);
var Producer = kafka.Producer,
client = new kafka.KafkaClient(),
offset = new kafka.Offset(client),
Consumer = kafka.Consumer,
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
const createProducer = async(req,res,next) => {
var topic = req.body.topic;
var sentMessage = JSON.stringify(req.body.messages);
producer.send(payloads, async function( err, data) {
})
client = new kafka.KafkaClient(),
consumer = new Consumer(client,
[
{ topic: topic, partition: 0 }
],
{
autoCommit: false,
fromOffset: 'earliest'
}
);
consumer.on('message', async function (message) {
console.log("Message : "+JSON.stringify(message))
try {
var currentdate = new Date();
var datetime = "Last Sync: " + currentdate.getDate() + "/"
+ (currentdate.getMonth()+1) + "/"
+ currentdate.getFullYear() + " @ "
+ currentdate.getHours() + ":"
+ currentdate.getMinutes() + ":"
+ currentdate.getSeconds();
var abb = await createListing(mongoClient,
{
topic: topic,
message: sentMessage,
time: datetime
}
);
} catch (e) {
console.error(":"+e);
}
finally {
}
});
await mongoClient.close();
res.send({
message: 'Successfully send data from producer',
payloads: payloads
})
async function createListing(client, newListing){
await mongoClient.connect();
const result = await
client.db("sample_airbnb").collection("listingsAndReviews").insertOne(newListing);
console.log(`New listing created with the following id: ${result.insertedId}`);
return result.insertedId;
}
}
The Messages consumed at the consumer are
谢谢,
推荐答案
我对代码进行了一些更改,现在可以从我的主题检索最新消息。
我已在offset.fetchLatestOffsets([topics],cb)
中创建了消费者,并对消费者选项进行了一些更改。
var payloads = [
{ topic: topicName, messages: messageTotopic, partition: 0}
];
producer.send(payloads, async function(err, data) {
});
var client = new kafka.KafkaClient();
offset.fetchLatestOffsets([topic], async function (error, offsets) {
if (error)
console.log(error);
offsetA = JSON.stringify(offsets[topic][0])
console.log('offset Value:: '+offsetA);
var consumer = new Consumer(
client,
[
{
topic: topic,
partition: 0,
offset: offsetA-1, // Offset value starts from 0
}
], {
autoCommit: false,
fromOffset: true,
}
);
consumer.on('message', async function (message) {
console.log("Message from last offset:: " + JSON.stringify(message)); // will return the latest message.
consumer.close();
});
});
通过这种方式,我能够解决KafkaClient
中与事件发射器相关的内存泄漏问题。
这篇关于如何使用NodeJS消费来自Kafka-Consumer的最新消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!