我正在尝试通过使用下推谓词优化我的Glue / PySpark作业。

start = date(2019, 2, 13)
end = date(2019, 2, 27)
print(">>> Generate data frame for ", start, " to ", end, "... ")
relaventDatesDf = spark.createDataFrame([
    Row(start=start, stop=end)
])
relaventDatesDf.createOrReplaceTempView("relaventDates")

relaventDatesDf = spark.sql("SELECT explode(generate_date_series(start, stop)) AS querydatetime FROM relaventDates")
relaventDatesDf.createOrReplaceTempView("relaventDates")
print("===LOG:Dates===")
relaventDatesDf.show()

flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "xxx", table_name = "flights", transformation_ctx="flights", push_down_predicate="""
    querydatetime BETWEEN '%s' AND '%s'
    AND querydestinationplace IN (%s)
""" % (start.strftime("%Y-%m-%d"), today.strftime("%Y-%m-%d"), ",".join(map(lambda s: str(s), arr))))


但是,看来Glue仍尝试读取指定日期范围之外的数据?

INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-01/part-00045-6cdebbb1-562c-43fa-915d-93b125aeee61.c000.snappy.parquet' for reading
INFO FileScanRDD: Reading File path: s3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet, range: 0-11797922, partition values: [12191,17965]
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
INFO S3NativeFileSystem: Opening 's3://.../flights/querydestinationplace=12191/querydatetime=2019-03-10/part-00021-34a13146-8fb2-43de-9df2-d8925cbe472d.c000.snappy.parquet' for reading
WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


请注意querydatetime=2019-03-01querydatetime=2019-03-10其在2019-02-13 - 2019-02-27的指定范围之外。这就是为什么下一行“中止HTTP连接”吗?它继续说“这可能是一个错误,并且可能导致次佳的行为”是不是有问题?

我想知道问题是因为它不支持谓词或IN内部的BETWEEN吗?



表创建DDL

CREATE EXTERNAL TABLE `flights`(
  `id` string,
  `querytaskid` string,
  `queryoriginplace` string,
  `queryoutbounddate` string,
  `queryinbounddate` string,
  `querycabinclass` string,
  `querycurrency` string,
  `agent` string,
  `quoteageinminutes` string,
  `price` string,
  `outboundlegid` string,
  `inboundlegid` string,
  `outdeparture` string,
  `outarrival` string,
  `outduration` string,
  `outjourneymode` string,
  `outstops` string,
  `outcarriers` string,
  `outoperatingcarriers` string,
  `numberoutstops` string,
  `numberoutcarriers` string,
  `numberoutoperatingcarriers` string,
  `indeparture` string,
  `inarrival` string,
  `induration` string,
  `injourneymode` string,
  `instops` string,
  `incarriers` string,
  `inoperatingcarriers` string,
  `numberinstops` string,
  `numberincarriers` string,
  `numberinoperatingcarriers` string)
PARTITIONED BY (
  `querydestinationplace` string,
  `querydatetime` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://pinfare-glue/flights/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0',
  'CrawlerSchemaSerializerVersion'='1.0',
  'UPDATED_BY_CRAWLER'='pinfare-parquet',
  'averageRecordSize'='19',
  'classification'='parquet',
  'compressionType'='none',
  'objectCount'='623609',
  'recordCount'='4368434222',
  'sizeKey'='86509997099',
  'typeOfData'='file')

最佳答案

我可以在代码中看到的问题之一是,您在between子句中使用的是“ today”而不是“ end”。尽管我看不到代码中声明了今天的变量,但我假设它已用今天的日期初始化。

在这种情况下,范围将有所不同,并且通过胶水火花读取的分区是正确的。

关于amazon-web-services - AWS Glue下推谓词无法正常工作,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56079448/

10-12 13:54