我有一个使用confluent.io kafka软件包的带有avro消息的kafka流。对于Java应用程序,这工作正常。但是我现在正尝试用javascript阅读这些消息。
我一直在尝试使用kafka-node + avsc包使用该模式将消息从缓冲区数组解码为字符串。我知道合流将模式ID的前5个字节作为魔术字节(0)+ 4个字节。
因此,我对Buffer进行切片以删除这些字节,然后尝试将其发送到avsc进行解码。但我得到一个错误
返回this.buf.utf8Slice(pos,pos + len);
RangeError:超出范围索引
在RangeError(本机)
在Tap.readString(C:\ git \ workflowapps \ workItemsApp \ node_modules \ avsc \ lib \ utils.js:452:19)
在StringType._read(C:\ git \ workflowapps \ workItemsApp \ node_modules \ avsc \ lib \ types.js:612:58)
同时尝试手动解码会留下很多非utf8字符,这样我就丢失了数据。
样例代码:
consumer.on('message', function(message) {
var val = message.value.slice(4);
sails.log.info('val buffer', val, val.length);
sails.log.info('hex',val.toString('hex'));
var type = avro.parse({"type":"record",
"name":"StatusEvent",
"fields":[{"name":"ApplicationUUID","type":"string"},
{"name":"StatusUUID","type":"string"},
{"name":"Name","type":"string"},
{"name":"ChangedBy","type":"string"},
{"name":"ChangedByUUID","type":"string"},
{"name":"ChangedAt","type":"long"}]
});
var decodedValue = type.fromBuffer(val);
sails.log.info('Decoded', decodedValue);
});
最佳答案
您的slice(4)
应该是slice(5)
(否则,您仅会跳过5个标头字节中的4个)。您也许还可以找到有用的信息here。