我目前有一个python数据流作业,其最终接收器是对BigQuery的PCollection写入。失败并显示以下错误:

Workflow failed. Causes: S01:XXXX+XXX+Write/WriteToBigQuery/NativeWrite failed., BigQuery import job "dataflow_job_XXXXXX" failed., BigQuery job "dataflow_job_XXXXXX" in project "XXXXXX" finished with error(s): errorResult: Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 19; errors: 1


为了获得更详细的错误报告,我正在运行:

bq --format=prettyjson show -j dataflow_job_XXXXXX


显示类似的东西(有很多错误,这只是其中之一):

{

    "location": "gs://XXXXX/XXXXXX/tmp/XXXXX/10002237702794672370/dax-tmp-2019-02-05_20_14_50-18341731408970037725-S01-0-5144bb700f6a9f0b/-shard--try-00d3c2c24d5b0371-endshard.json",

    "message": "Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 11; errors: 1. Please look into the errors[] collection for more details.",

    "reason": "invalid"

  },


然后,我去寻找特定的碎片,以查看哪些PCollection行有错误以及我需要做什么以过滤这些行或修复我的错误:

gsutil ls gs://XXXXX/XXXXXX/tmp/XXXXX/10002237702794672370/dax-tmp-2019-02-05_20_14_50-18341731408970037725-S01-0-5144bb700f6a9f0b/-shard--try-00d3c2c24d5b0371-endshard.json


但是该命令返回:

CommandException: One or more URLs matched no objects.


调试作业的最佳做法是什么(大约需要几个小时才能完成)?我现在的想法是将PCollection以JSON格式写入到非临时位置的GCS中,然后尝试自己吸收它。

最佳答案

对于您的错误类型,我执行以下操作:


使用Json检查工具列出有错误的记录。
在本地运行Cloud Dataflow。
添加管道步骤以验证每个Json记录,并从管道中删除不良条目。使用附带输出的死信文件或记录不良记录以进行调试。


本文可能为您提供一些处理无效输入的想法。

Handling Invalid Inputs in Dataflow

10-05 21:09