我正在使用最新项目中的协议Buffer(proto3)。但是我遇到下面的问题。从python生成的编码消息,我正在尝试从Scala对其进行解码(我正在使用SPARK)。您可以从“值”及其长度227中看到实际消息,如控制台中所述。

错误字段只是典型的字符串类型字段。我也尝试将字符串字段解码为“ UTF-8”,但无法正常工作。它看起来编码/解码问题。还有其他遇到类似问题的人吗?

控制台输出

...

value:
U
$a5a9c2bb-efd2-4ea3-ae0e-7479925a7807"kafka_producer.py*2016-12-18 10:37:25.075614*
data{"url": "some url 0", "title": "some title", "_id": "5815a37c43cfd44120e50538", "content": "some content", "tags": ["football"]}
value length: 227
Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.google.protobuf.CodedInputStream.readStringRequireUtf8()Ljava/lang/String;
at org.libero.messages.Messages$Event.<init>(Messages.java:160)
at org.libero.messages.Messages$Event.<init>(Messages.java:117)
at org.libero.messages.Messages$Event$1.parsePartialFrom(Messages.java:1564)
at org.libero.messages.Messages$Event$1.parsePartialFrom(Messages.java:1559)
at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309)
at org.libero.messages.Messages$EventDataProduced.<init>(Messages.java:1742)
at org.libero.messages.Messages$EventDataProduced.<init>(Messages.java:1697)
at org.libero.messages.Messages$EventDataProduced$1.parsePartialFrom(Me

...


代码片段

 ....

 nonEmptyMsgs.foreachRDD { msgsRDD =>
      println("Trying to print each RDD: " + msgsRDD)
      println("Count: " + msgsRDD.count())
      val elems = msgsRDD.collect()
      println("Elems: " + elems)
      for (v <- elems) {
        println("key: " + v._1)
        println("value: " + v._2)
        println("value length: " + v._2.length())
        val bytes = new String(v._2).getBytes()
        val event = EventDataProduced.parseFrom(ByteString.copyFrom(bytes))
        println("event: " + event)
      }
    }

...

最佳答案

How can I use proto3 with Hadoop/Spark?

如上所述的链接,我解决了。

只需要重新放置protobuffers,VerifyError就消失了:

      <relocations>
        <relocation>
          <pattern>com.google.protobuf</pattern>
          <shadedPattern>shaded.com.google.protobuf</shadedPattern>
        </relocation>
      </relocations>

07-26 00:45