这就是初始化我的消费者的方式:
const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
autoCommit: false
})
现在,消费者
consumer.on('message', message => applyMessage(message))
问题是
applyMessage
使用knex
与数据库对话,代码看起来像这样:async function applyMessage(message: kafka.Message) {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
}
上面的代码使
applyMessage
对于kafka中的所有消息并行执行,因此在上面的代码中,假设数据库中还没有用户,即使来自kafka的第二条消息,usersCount
始终为0自第一次调用applyMessage
会插入一个用户以来,该值应该已经为1。如何以所有
applyMessage
函数顺序运行的方式“同步”代码? 最佳答案
您需要实现某种互斥体。基本上,是一个将事物排队等待同步执行的类。例
var Mutex = function() {
this.queue = [];
this.locked = false;
};
Mutex.prototype.enqueue = function(task) {
this.queue.push(task);
if (!this.locked) {
this.dequeue();
}
};
Mutex.prototype.dequeue = function() {
this.locked = true;
const task = this.queue.shift();
if (task) {
this.execute(task);
} else {
this.locked = false;
}
};
Mutex.prototype.execute = async function(task) {
try { await task(); } catch (err) { }
this.dequeue();
}
为了使其正常工作,您的
applyMessage
函数(无论是处理Kafka消息的函数)都需要返回Promise
-请注意,异步也已从父函数移至返回的Promise函数:function applyMessage(message: kafka.Message) {
return new Promise(async function(resolve,reject) {
try {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
resolve();
} catch (err) {
reject(err);
}
});
}
最后,每次
applyMessage
的调用都需要添加到Mutex队列中,而不是直接调用:var mutex = new Mutex();
consumer.on('message', message => mutex.enqueue(function() { return applyMessage(message); }))