我有一个使用confluent.io kafka软件包的带有avro消息的kafka流。对于Java应用程序,这工作正常。但是我现在正尝试用javascript阅读这些消息。

我一直在尝试使用kafka-node + avsc包使用该模式将消息从缓冲区数组解码为字符串。我知道合流将模式ID的前5个字节作为魔术字节(0)+ 4个字节。

因此,我对Buffer进行切片以删除这些字节,然后尝试将其发送到avsc进行解码。但我得到一个错误


  返回this.buf.utf8Slice(pos,pos + len);
  
  RangeError:超出范围索引
      在RangeError(本机)
      在Tap.readString(C:\ git \ workflowapps \ workItemsApp \ node_modules \ avsc \ lib \ utils.js:452:19)
      在StringType._read(C:\ git \ workflowapps \ workItemsApp \ node_modules \ avsc \ lib \ types.js:612:58)


同时尝试手动解码会留下很多非utf8字符,这样我就丢失了数据。

样例代码:

  consumer.on('message', function(message) {
      var val = message.value.slice(4);
      sails.log.info('val buffer', val, val.length);
      sails.log.info('hex',val.toString('hex'));
      var type = avro.parse({"type":"record",
       "name":"StatusEvent",
        "fields":[{"name":"ApplicationUUID","type":"string"},
        {"name":"StatusUUID","type":"string"},
        {"name":"Name","type":"string"},
        {"name":"ChangedBy","type":"string"},
        {"name":"ChangedByUUID","type":"string"},
        {"name":"ChangedAt","type":"long"}]
      });

      var decodedValue = type.fromBuffer(val);
      sails.log.info('Decoded', decodedValue);
    });

最佳答案

您的slice(4)应该是slice(5)(否则,您仅会跳过5个标头字节中的4个)。您也许还可以找到有用的信息here

07-24 17:57