我正在使用最新项目中的协议Buffer(proto3)。但是我遇到下面的问题。从python生成的编码消息,我正在尝试从Scala对其进行解码(我正在使用SPARK)。您可以从“值”及其长度227中看到实际消息,如控制台中所述。
错误字段只是典型的字符串类型字段。我也尝试将字符串字段解码为“ UTF-8”,但无法正常工作。它看起来编码/解码问题。还有其他遇到类似问题的人吗?
控制台输出
...
value:
U
$a5a9c2bb-efd2-4ea3-ae0e-7479925a7807"kafka_producer.py*2016-12-18 10:37:25.075614*
data{"url": "some url 0", "title": "some title", "_id": "5815a37c43cfd44120e50538", "content": "some content", "tags": ["football"]}
value length: 227
Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.google.protobuf.CodedInputStream.readStringRequireUtf8()Ljava/lang/String;
at org.libero.messages.Messages$Event.<init>(Messages.java:160)
at org.libero.messages.Messages$Event.<init>(Messages.java:117)
at org.libero.messages.Messages$Event$1.parsePartialFrom(Messages.java:1564)
at org.libero.messages.Messages$Event$1.parsePartialFrom(Messages.java:1559)
at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309)
at org.libero.messages.Messages$EventDataProduced.<init>(Messages.java:1742)
at org.libero.messages.Messages$EventDataProduced.<init>(Messages.java:1697)
at org.libero.messages.Messages$EventDataProduced$1.parsePartialFrom(Me
...
代码片段
....
nonEmptyMsgs.foreachRDD { msgsRDD =>
println("Trying to print each RDD: " + msgsRDD)
println("Count: " + msgsRDD.count())
val elems = msgsRDD.collect()
println("Elems: " + elems)
for (v <- elems) {
println("key: " + v._1)
println("value: " + v._2)
println("value length: " + v._2.length())
val bytes = new String(v._2).getBytes()
val event = EventDataProduced.parseFrom(ByteString.copyFrom(bytes))
println("event: " + event)
}
}
...
最佳答案
How can I use proto3 with Hadoop/Spark?
如上所述的链接,我解决了。
只需要重新放置protobuffers,VerifyError就消失了:
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>shaded.com.google.protobuf</shadedPattern>
</relocation>
</relocations>