本文介绍了错误:字段 Message.Field .protobuf.MessageTypeAck.sourceModuleID 的线路类型非法:1(预期为 0)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 kafka 和协议缓冲区生成和消费消息的应用程序,一切都很好.我正在使用 SerializeAsString() 序列化协议缓冲区(这个应用程序是用 C++ 编写的).

I've app that procude&consume message with kafka and protocol buffer and everything works great. I'm serialize the protocol buffer with SerializeAsString() (this app was written in c++).

现在,我添加了新的 node.js 网站,该网站也使用消息并尝试解码它们.

Now, I've added new node.js website that also consume messages and try to decode them.

我的 js 代码(使用很棒的 ProtoBuf.js 模块):强>

My js code (using the great ProtoBuf.js module):

var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"),
    protobuf = builder.build("protobuf"),
    Trace = protobuf.Trace,
    MessageType = protobuf.MessageType,
    MessageTypeAck = protobuf.MessageTypeAck,
    MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive;

function getMessageType(val) {
  return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0]
}

consumer.on('message', function (message) {
    try{
      switch(getMessageType(message.key[0])) {
        case 'MESSAGE_TYPE_ACK':
          console.log(MessageTypeAck.decode(message.value));
          break;
        case 'MESSAGE_TYPE_KEEP_ALIVE':
          console.log(MessageTypeKeepAlive.decode(message.value));
          break;
        default:
          console.log("Unknown message type");
      }
    } catch (e){
      if (e.decoded) {
        var err = e.decoded;
        console.log(err);
      }
      else {
        console.log(e);
      }
    }
});

结果:

[Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)]

我的原型文件:

Trace.proto:

package protobuf;

message Trace {
    optional string topic = 1;
    optional int32 partition = 2;
    optional int64 offset = 3;
}

MessageType.proto

package protobuf;

enum MessageType {
    MESSAGE_TYPE_ACK = 1;
    MESSAGE_TYPE_KEEP_ALIVE = 2;
}

Messages.proto:

import "Trace.proto";

package protobuf;

message MessageTypeAck {
    repeated Trace trace = 1;

    optional string sourceModuleName = 2;
    optional int32  sourceModuleID   = 3;
}

message MessageTypeKeepAlive {
    repeated Trace trace = 1;

    optional string sourceModuleName = 2;
    optional int32  sourceModuleID   = 3;
}

All.proto

import "Trace.proto"
import "MessageType.proto";
import "Messages.proto"

我做错了什么?(解码?)

What am I doing wrong? (decode?)

推荐答案

所以,感谢这个 SO 问答,我想通了!问题与我使用缓冲区的方式(通过 kafka)有关 - 作为 utf-8(默认).它实际上与我没有附加的代码有关:

so, Thanks to this SO question&answer, I figured it out!The problem is related to the way I've consume the buffer (by kafka) - as utf-8 (default). It actually related to code which I didn't attached:

var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('localhost:2181'),
    consumer = new Consumer(
        client,
        [
            { topic: 'Genesis', partition: 0 }
        ],
        {
            autoCommit: false,
            encoding: 'buffer'
        }
    );

解决方案是添加 encoding: 'buffer' 行(默认为 'utf-8',如前所述 此处).

and the solution was to add the encoding: 'buffer' line (the default is 'utf-8' as mentioned here).

这篇关于错误:字段 Message.Field .protobuf.MessageTypeAck.sourceModuleID 的线路类型非法:1(预期为 0)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 04:35