本文介绍了将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法将侧输入应用于 Apache Beam 中的 BigQueryIO.read() 操作.

Is there a way to apply a side input to a BigQueryIO.read() operation in Apache Beam.

例如,我在 PCollection 中有一个值,我想在查询中使用它来从 BigQuery 表中获取数据.这可以使用侧面输入吗?还是应该在这种情况下使用其他东西?

Say for example I have a value in a PCollection that I want to use in a query to fetch data from a BigQuery table. Is this possible using side input? Or should something else be used in such a case?

我在类似的情况下使用了 NestedValueProvider,但我想我们只能在某个值取决于我的运行时值时使用它.或者我可以在这里使用相同的东西吗?如果我错了,请纠正我.

I used NestedValueProvider in a similar case but I guess we can use that only when a certain value depends on my runtime value. Or can I use the same thing here? Please correct me if I'm wrong.

我尝试过的代码:

Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
    Tabledata tableRequest = bigQueryClient.tabledata();

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
    }).withSideInputs(bqDataView));

我得到的错误是:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
    at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    ... 4 more

推荐答案

Beam 模型目前不能很好地支持这种依赖于数据的操作.

The Beam model does not currently support this kind of data-dependent operation very well.

一种方法是编写您自己的 DoFn,它接收侧输入并直接连接到 BQ.不幸的是,这不会为您提供任何并行性,因为 DoFn 将完全运行在同一线程上.

A way of doing it is to code your own DoFn that receives the side input and connects directly to BQ. Unfortunately, this would not give you any parallelism, as the DoFn would run completely on the same thread.

一旦 Beam 支持可拆分的 DoFn,情况就会不同.

Once Splittable DoFns are supported in Beam, this will be a different story.

在当前的世界状态下,您需要使用 BQ 客户端库 用于添加查询 BQ 的代码,就像您不在 Beam 管道中一样.

In the current state of the world, you would need to use the BQ client library to add code that would query BQ as if you were not in a Beam pipeline.

鉴于您问题中的代码,关于如何实现这一点的粗略想法如下:

Given the code in your question, a rough idea on how to implement this is the following:

class ReadDataDoFn extends DoFn<String,TableRow>(){
    private Tabledata tableRequest;

    private Bigquery bigQueryClient;

    private Bigquery createBigQueryClientWithinDoFn() {
        // I'm not sure how you'd implement this, but you had the right idea
    }

    @Setup
    public void setup() {
        bigQueryClient = createBigQueryClientWithinDoFn(); 
        tableRequest = bigQueryClient.tabledata();
    }
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
}

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new ReadDataDoFn()));

这篇关于将侧输入应用于 Apache Beam 中的 BigQueryIO.read 操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-31 20:05