DataflowPipelineRunner

DataflowPipelineRunner

本文介绍了从 Google App Engine 应用运行 Google Dataflow 管道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 DataflowPipelineRunner 创建数据流作业.我尝试了以下场景.

I am creating a dataflow job using DataflowPipelineRunner. I tried the following scenarios.

  1. 不指定任何machineType
  2. 配g1小机
  3. 使用 n1-highmem-2

在上述所有场景中,输入是来自 GCS 的一个非常小的文件(KB 大小),输出是 Big Query 表.

In all the above scenarios, Input is a file from GCS which is very small file(KB size) and output is Big Query table.

我在所有场景中都遇到内存不足的错误

I got Out of memory error in all the scenarios

我编译后的代码大小为 94mb.我只尝试字数统计示例,但没有读取任何输入(在作业开始前失败).请帮助我理解为什么我会收到此错误.

The size of my compiled code is 94mb. I am trying only word count example and it did not read any input(It fails before the job starts). Please help me understand why i am getting this error.

注意:我正在使用 appengine 开始工作.

Note: I am using appengine to start the job.

注意:相同的代码适用于测试版0.4.150414

Note: The same code works with beta versoin 0.4.150414

编辑 1

根据答案中的建议尝试以下操作,

As per the suggestions in the answer tried the following,

  1. 自动缩放切换到基本缩放.
  2. 使用机器类型 B2,提供 256MB 内存
  1. Switched from Automatic scaling to Basic Scaling.
  2. Used machine type B2 which provides 256MB memory

经过这些配置,Java Heap Memory 问题就解决了.但是它试图将一个 jar 上传到超过 10Mb 的暂存位置,因此它失败了.

After these configuration, Java Heap Memory problem is solved. But it is trying to upload a jar into stagging location which is more than 10Mb, hence it fails.

它记录以下异常

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

我尝试直接上传 jar 文件 - appengine-api-1.0-sdk-1.9.20.jar,它仍然尝试上传这个 jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar.我不知道它是什么罐子.对这个罐子的任何想法表示赞赏.

I tried directly uploading the jar file - appengine-api-1.0-sdk-1.9.20.jar, still it tries to upload this jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar.which i dont know what jar it is. Any idea on what this jar is appreciated.

请帮我解决这个问题.

推荐答案

简短的回答是,如果您在 托管 VM 您不会遇到 AppEngine 沙箱限制(使用 F1 或 B1 实例类、执行时间限制问题、列入白名单的 JRE 类).如果您真的想在 App Engine 沙箱中运行,那么您对 ​​Dataflow SDK 的使用最符合 AppEngine 沙箱的限制.下面我将解释常见问题以及人们为符合 AppEngine 沙箱限制所做的工作.

The short answer is that if you use AppEngine on a Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class, execution time limit issues, whitelisted JRE classes). If you really want to run within the App Engine sandbox, then your use of the Dataflow SDK most conform to the limits of the AppEngine sandbox. Below I explain common issues and what people have done to conform to the AppEngine sandbox limits.

Dataflow SDK 需要一个 AppEngine 实例类,该实例类具有足够的内存来执行用户应用程序以构建管道、暂存任何资源并将作业描述发送到 Dataflow 服务.通常,我们看到用户需要使用超过 128mb 的实例类内存以看不到 OOM 错误.

The Dataflow SDK requires an AppEngine instance class which has enough memory to execute the users application to construct the pipeline, stage any resources, and send the job description to the Dataflow service. Typically we have seen users require using an instance class with more than 128mb of memory to not see OOM errors.

如果您的应用程序所需的资源已经暂存,那么构建管道并将其提交到 Dataflow 服务通常只需不到几秒钟的时间.将 JAR 和任何其他资源上传到 GCS 可能需要 60 秒以上的时间.这可以通过预先将 JAR 预先暂存到 GCS 来手动解决(如果 Dataflow SDK 检测到它们已经存在,则会跳过再次暂存它们)或使用 任务队列 以获得 10 分钟的限制(请注意,对于大型应用程序,10 分钟可能不足以暂存您的所有资源).

Generally constructing a pipeline and submitting it to the Dataflow service typically takes less than a couple of seconds if the required resources for your application are already staged. Uploading your JARs and any other resources to GCS can take longer than 60 seconds. This can be solved manually by pre-staging your JARs to GCS beforehand (the Dataflow SDK will skip staging them again if it detects they are already there) or using a task queue to get a 10 minute limit (note that for large applications, 10 mins may not be enough to stage all your resources).

最后,在 AppEngine 沙盒环境中,您和您的所有依赖项仅限于使用 将 JRE 中的类列入白名单,否则您会得到如下异常:

Finally, within the AppEngine sandbox environment, you and all your dependencies are limited to using only whitelisted classes within the JRE or you'll get an exception like:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

编辑 1

我们对类路径上的 jar 文件内容进行散列,并使用修改后的文件名将它们上传到 GCS.AppEngine 使用自己的 JAR 运行沙盒环境,appengine-api-L4wtoWwoElWmstI1Ia93cg.jar 指的是 appengine-api.jar,它是沙盒环境添加的 jar.您可以从我们的 PackageUtil#getUniqueContentName(...) 我们只是在 .jar 之前附加 -$HASH .

We perform a hash of the contents of the jars on the classpath and upload them to GCS with a modified filename. AppEngine runs a sandboxed environment with its own JARs, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar refers to appengine-api.jar which is a jar that the sandboxed environment adds. You can see from our PackageUtil#getUniqueContentName(...) that we just append -$HASH before .jar.

我们正在努力解决您看到 RequestPayloadToLarge 异常的原因,目前建议您设置 filesToStage 选项并过滤掉不需要执行您的数据流解决您面临的问题.您可以看到我们如何构建文件以使用 DataflowPipelineRunner#detectClassPathResourcesToStage(...).

We are working to solve why you are seeing the RequestPayloadToLarge excepton and it is currently recommended that you set the filesToStage option and filter out the jars not required to execute your Dataflow to get around the issue that you face. You can see how we build the files to stage with DataflowPipelineRunner#detectClassPathResourcesToStage(...).

这篇关于从 Google App Engine 应用运行 Google Dataflow 管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 20:24