问题描述
我有一个使用 kafka 和协议缓冲区生成和消费消息的应用程序,一切都很好.我正在使用 SerializeAsString()
序列化协议缓冲区(这个应用程序是用 C++ 编写的).
I've app that procude&consume message with kafka and protocol buffer and everything works great. I'm serialize the protocol buffer with SerializeAsString()
(this app was written in c++).
现在,我添加了新的 node.js 网站,该网站也使用消息并尝试解码它们.
Now, I've added new node.js website that also consume messages and try to decode them.
我的 js 代码(使用很棒的 ProtoBuf.js 模块):强>
My js code (using the great ProtoBuf.js module):
var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
protobuf = builder.build("protobuf"),
Trace = protobuf.Trace,
MessageType = protobuf.MessageType,
MessageTypeAck = protobuf.MessageTypeAck,
MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;
function getMessageType(val) {
return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}
consumer.on('message', function (message) {
try{
switch(getMessageType(message.key[0])) {
case 'MESSAGE_TYPE_ACK':
console.log(MessageTypeAck.decode(message.value));
break;
case 'MESSAGE_TYPE_KEEP_ALIVE':
console.log(MessageTypeKeepAlive.decode(message.value));
break;
default:
console.log("Unknown message type");
}
} catch (e){
if (e.decoded) {
var err = e.decoded;
console.log(err);
}
else {
console.log(e);
}
}
});
结果:
[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]
我的原型文件:
Trace.proto:
package protobuf;
message Trace {
optional string topic = 1;
optional int32 partition = 2;
optional int64 offset = 3;
}
MessageType.proto
package protobuf;
enum MessageType {
MESSAGE_TYPE_ACK = 1;
MESSAGE_TYPE_KEEP_ALIVE = 2;
}
Messages.proto:
import "Trace.proto";
package protobuf;
message MessageTypeAck {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
message MessageTypeKeepAlive {
repeated Trace trace = 1;
optional string sourceModuleName = 2;
optional int32 sourceModuleID = 3;
}
All.proto
import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"
我做错了什么?(解码?)
What am I doing wrong? (decode?)
推荐答案
所以,感谢这个 SO 问答,我想通了!问题与我使用缓冲区的方式(通过 kafka)有关 - 作为 utf-8(默认).它实际上与我没有附加的代码有关:
so, Thanks to this SO question&answer, I figured it out!The problem is related to the way I've consume the buffer (by kafka) - as utf-8 (default). It actually related to code which I didn't attached:
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('localhost:2181'),
consumer = new Consumer(
client,
[
{ topic: 'Genesis', partition: 0 }
],
{
autoCommit: false,
encoding: 'buffer'
}
);
解决方案是添加 encoding: 'buffer' 行(默认为 'utf-8',如前所述 此处).
and the solution was to add the encoding: 'buffer' line (the default is 'utf-8' as mentioned here).
这篇关于错误:字段 Message.Field .protobuf.MessageTypeAck.sourceModuleID 的线路类型非法:1(预期为 0)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!