使用日期范围对分区数据执行Spark

使用日期范围对分区数据执行Spark

本文介绍了使用日期范围对分区数据执行Spark SQL查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的数据集以这种方式分区:

My dataset is partitioned in this way:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

用两个日期之间的数据在Spark中创建数据框的最简单有效的方法是什么?

What is the easiest and efficient way to create a dataframe in spark loaded with data between two dates?

推荐答案

如果您绝对必须遵循这种分区策略,答案取决于您是否愿意承担分区发现费用.

If you absolutely have to stick to this partitioning strategy, the answer depends on whether you are willing to bear partition discovery costs or not.

如果您愿意让Spark发现所有分区(只需要发生一次)(直到添加新文件),则可以加载基本路径,然后使用分区列进行过滤.

If you are willing to have Spark discover all partitions, which only needs to happen once (until you add new files), you can load the basepath and then filter using the partition columns.

如果您不希望Spark发现所有分区,例如,因为您有数百万个文件,则唯一有效的常规解决方案是将要查询的间隔分成几个子间隔,您可以轻松查询使用@ r0bb23的方法,然后合并在一起.

If you do not want Spark to discover all the partitions, e.g., because you have millions of files, the only efficient general solution is to break the interval you want to query for into several sub-intervals you can easily query for using @r0bb23's approach and then union then together.

如果您想同时兼顾以上两种情况并拥有稳定的架构,则可以通过定义外部分区表在metastore中注册分区.如果您希望架构随着Metastore管理的表在此时的架构开发方面表现不佳而发生变化,请不要执行此操作.

If you want the best of both cases above and you have a stable schema, you can register the partitions in the metastore by defining an external partitioned table. Don't do this if you expect your schema to evolve as metastore-managed tables manage schema evolution quite poorly at this time.

例如,要在2017-10-062017-11-03之间进行查询,请执行以下操作:

For example, to query between 2017-10-06 and 2017-11-03 you'd do:

// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

为此可以编写通用代码,但是我还没有遇到.更好的方法是以我对问题的评论中概述的方式进行划分.如果您的表是使用/basepath/ts=yyyymmddhhmm/*.parquet进行分区的,那么答案很简单:

Writing generic code for this is certainly possible but I haven't encountered it. The better approach is to partition in the manner outlined in the comment I made to the question. If your table was partitioned using something like /basepath/ts=yyyymmddhhmm/*.parquet then the answer is simply:

spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

值得增加小时数的原因&分钟是您可以编写处理间隔的通用代码,而不管是按周,日,小时还是每15分钟对数据进行分区.实际上,您甚至可以在同一张表中管理不同粒度的数据,例如,将较旧的数据汇总到更高级别,以减少需要发现的分区总数.

The reason why it's worth adding hours & minutes is that you can then write generic code that handles intervals regardless of whether you have the data partitioned by week, day, hour, or every 15 mins. In fact you can even manage data with different granularity in the same table, e.g., older data is aggregated at higher levels to reduce the total number of partitions that need to be discovered.

这篇关于使用日期范围对分区数据执行Spark SQL查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-25 03:01