我正在尝试编写一个Beam Streaming管道,该管道仅从PubSub队列中读取,分析数据并写入两个BigQuery表之一。因此,代码利用了侧面输出的优势,可以从DoFn内部写入两个表之一。我遇到以下错误消息:java.lang.IllegalArgumentException:无法序列化DoFnAndMainOutput{doFn=com.pipeline.PubSubToBigQuery$ParsePubSubMessage@50eca7c6,mainOutputTag = Tag}。我将在下面附加完整的错误消息,DoFn和Test类:
DoFn:
public static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {
private TupleTag<TableRow> soundEventReadings;
private TupleTag<TableRow> doorEventReadings;
public PubsubMessageToTableRow(TupleTag<TableRow> soundEventReadings, TupleTag<TableRow> doorEventReadings){
this.soundEventReadings = soundEventReadings;
this.doorEventReadings = doorEventReadings;
}
@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {
return input.apply("Parse PubSub Message",
ParDo.of(new ParsePubSubMessage(soundEventReadings, doorEventReadings))
.withOutputTags(soundEventReadings, TupleTagList.of(doorEventReadings)));
}
}
/**
* Parse the PubSub Json message and create either a Door or Sound Event. Then create a TableRow object from the
* event objects.
*/
public static class ParsePubSubMessage extends DoFn<PubsubMessage, TableRow> {
private final TupleTag<TableRow> soundEventReadings;
private final TupleTag<TableRow> doorEventReadings;
public ParsePubSubMessage(TupleTag<TableRow> soundEventReadings,
TupleTag<TableRow> doorEventReadings) {
this.soundEventReadings = soundEventReadings;
this.doorEventReadings = doorEventReadings;
}
@ProcessElement
public void processElement(ProcessContext c, MultiOutputReceiver out) throws IOException{
PubsubMessage message = c.element();
String jsonString = new String(message.getPayload(), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();
// A DoorEvent can either be a door open/close event signified with #door# in the SensorName or can
// be a motion event from the motion sensor next to the door signified with #motion# in the SensorName
if (jsonString.contains("#door#") || jsonString.contains("#motion#")){
DoorEvent doorEvent = mapper.readValue(jsonString, DoorEvent.class);
out.get(doorEventReadings).output(doorEvent.toTableRow());
} else if (jsonString.contains("noiseFloor")){
SoundEvent soundEvent = mapper.readValue(jsonString, SoundEvent.class);
out.get(soundEventReadings).output(soundEvent.toTableRow());
}
}
}
测试
@Rule
public final transient TestPipeline testPipeline = TestPipeline.create();
@Test
public void testPubsubMessageToTableRow() throws IOException{
String jsonDoorEvent = "{\"EventID\":\"12\",\"HomeID\":" +
"\"22222\",\"SmartThingsSensorName\":" +
"\"sa#door#1#front_door\",\"State\":\"closed\",\"Label\":false," +
"\"HasBeenLabelled\":false,\"EventTime\":\"2019-01-09T12:22:22Z\",\"CreateDate\":" +
"\"2019-01-09T15:17:00Z\",\"ModifyDate\":\"2019-01-09T15:17:00Z\"}";
TableRow door = doorEvent.toTableRow();
TableRow outputDoorRow = new TableRow().set("EventID", "12")
.set("HomeID", "22222")
.set("SmartThingsSensorName", "sa#door#1#front_door")
.set("State", "closed")
.set("Label", false)
.set("HasBeenLabelled", false)
.set("EventTime", "2019-01-09T12:22:22Z")
.set("CreateDate", "2019-01-09T15:17:00Z")
.set("ModifyDate", "2019-01-09T15:17:00Z");
Map<String, String> attributes = new HashMap<>();
attributes.put("eventTime", "2019-01-09T12:22:22Z");
PubsubMessage messageDoor = new PubsubMessage(jsonDoorEvent.getBytes(), attributes);
final TupleTag<TableRow> doorEventReadings = new TupleTag<TableRow>(){};
TestStream<PubsubMessage> createEvent =
TestStream.create(PubsubMessageWithAttributesCoder.of())
.addElements(messageDoor).advanceWatermarkToInfinity();
PCollectionTuple tuple = testPipeline
.apply("Create Stream", createEvent)
.apply("Parse pipeline",
new PubSubToBigQuery.PubsubMessageToTableRow(soundEventReadings, doorEventReadings));
PCollection<TableRow> doorEventOutput = tuple.get(doorEventReadings);
PAssert.that(doorEventOutput).containsInAnyOrder(outputDoorRow);
testPipeline.run().waitUntilFinish();
}
堆栈跟踪:
java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=com.pipeline.PubSubToBigQuery$ParsePubSubMessage@50eca7c6, mainOutputTag=Tag<output>}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:462)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:160)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:695)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:156)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoPayloadTranslator.translate(ParDoTranslation.java:111)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:547)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:557)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:348)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:329)
at com.ecobee.hm_occupancy_data_pipeline.PubSubToBigQueryTest.testPubsubMessageToTableRow(PubSubToBigQueryTest.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:317)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: com.pipeline.PubSubToBigQueryTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 48 more
最佳答案
问题出在日志中:java.io.NotSerializableException: com.pipeline.PubSubToBigQueryTest
。使您的测试工具为Serializable
,这应该可以解决它。或者尝试将所有DoFns
和其他嵌入式功能移到单独的可序列化类中。