本文介绍了如何为包装 TableRow 的类指定/定义编码器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我定义了一个包装 com.google.api.services.bigquery.model.TableRow 类的类,并将其定义为内部成员

I have defined a class that wraps com.google.api.services.bigquery.model.TableRow class defining it as a internal member

public class TableRowWrapper implements Serializable {

    private TableRow tableRow;

    public TableRowWrapper() {
    }
...
}

我还有一些 DoFn 处理该 TableRowWrapper 类的输入/输出实例,从而产生 PCollection.我尝试用 @DefaultCoder(SerializableCoder.class)@DefaultCoder(ArvoCoder.class) 注释该类,但它总是无法编码,因为它找不到TableRow 的成员属性实例的编码器.这是使用 ArvoCoder

I have also some DoFn that processes input/output instances of that TableRowWrapper class resulting in a PCollection<TableRowWrapper>. I've tried annotating that class with @DefaultCoder(SerializableCoder.class) and @DefaultCoder(ArvoCoder.class) but it always fails to code because it can't find a coder for the member attribute instance of TableRow.Here is an example when using ArvoCoder

 java.lang.IllegalArgumentException: Unable to encode element 'com.test.bigquery.api.TableRowWrapper@5129e8a6' with coder 'AvroCoder'.
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:177)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
    at      com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at   com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
Caused by: java.lang.NullPointerException: in com.test.bigquery.api.TableRowWrapper in com.google.api.services.bigquery.model.TableRow in array null of array in field f of com.google.api.services.bigquery.model.TableRow in field tableRow of com.test.bigquery.api.TableRowWrapper
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:145)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
    at com.google.cloud.dataflow.sdk.coders.AvroCoder.encode(AvroCoder.java:227)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.getEncodedElementByteSize(StandardCoder.java:174)
    at com.google.cloud.dataflow.sdk.coders.StandardCoder.registerByteSizeObserver(StandardCoder.java:191)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:633)
    at com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:542)
    at com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(MapTaskExecutorFactory.java:429)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:115)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:61)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:46)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:157)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:329)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:483)
    at com.test.cdf.wrapper.pipeline.DataflowPipeline$TableRowToWrapperDoFn.processElement(DataflowPipeline.java:203)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
    at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
    at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193)
    at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171)
    at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117)
    at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147)
    at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.NullPointerException
    at org.apache.avro.reflect.ReflectDatumWriter.writeArray(ReflectDatumWriter.java:67)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
    at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
    at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:143)
    ... 31 more

如何为此类定义编码器?

How can I define a coder for this class?

推荐答案

正如您所注意到的,自从 TableRow 不是 Serializable,你将无法使用 SerializableCoder.

As you've noticed, since TableRow is not Serializable, you won't be able to use SerializableCoder.

为了编码可空值,Avro 的自动模式生成需要 通过 的显式联合模式nofollow">@AvroSchema 注释或 @Nullable 注释——特别是 org.apache.avro.reflect.Nullable 不是 javax.annotation.可空.这些在 TableRow,所以 AvroCoder 也不适用.

In order to encode nullable values, Avro’s automatic schema generation requires either an explicit union schema including null via @AvroSchema annotation or a @Nullable annotation -- specifically org.apache.avro.reflect.Nullable not javax.annotation.Nullable. These are not present in TableRow, so AvroCoder is also inapplicable.

也许为您的 TableRowWrapper 提供编码器的最简单方法是直接通过 TableRowJsonCoder:

Perhaps the easiest way to provide a coder for your TableRowWrapper is to do so directly a via thin wrapper on TableRowJsonCoder:

class TableRowWrapperCoder extends CustomCoder<TableRowWrapper> {

  private static final Coder<TableRow> tableRowCoder = TableRowJsonCoder.of();

  @Override
  public void encode(TableRowWrapper value, OutputStream outStream, Context context)
      throws IOException {
    tableRowCoder.encode(value.getRow(), outStream, context);
  }

  @Override
  public TableRowWrapper decode(InputStream inStream, Context context)
      throws IOException {
    return new TableRowWrapper(tableRowCoder.decode(inStream, context));
  }

  ...
}

您可以通过

pipeline.getCoderRegistry()
    .registerCoder(TableRowWrapper.class, new TableRowWrapperCoder());

这篇关于如何为包装 TableRow 的类指定/定义编码器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 20:24