问题描述
我正在尝试做一些非常简单的事情,并且遇到了一些非常愚蠢的挣扎.我认为这一定与对火花在做什么产生了根本性的误解有关.我将不胜感激任何帮助或解释.
I am trying to do something very simple and I'm having some very stupid struggles. I think it must have to do with a fundamental misunderstanding of what spark is doing. I would greatly appreciate any help or explanation.
我有一个很大的表(〜3 TB,约300MM行,25k分区),另存为s3中的实木复合地板,我想将其的很小一部分作为一个实木复合地板文件提供给某人.不幸的是,这需要永远完成,我不明白为什么.我尝试了以下方法:
I have a very large (~3 TB, ~300MM rows, 25k partitions) table, saved as parquet in s3, and I would like to give someone a tiny sample of it as a single parquet file. Unfortunately, this is taking forever to finish and I don't understand why. I have tried the following:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")
然后当它不起作用时,我尝试了一下,我认为应该是一样的,但是我不确定. (我添加了print
以进行调试.)
and then when that didn't work I tried this, which I thought should be the same, but I wasn't sure. (I added the print
's in an effort to debug.)
tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")
当我观看Yarn UI时, 和write
的打印语句都使用25k映射器. count
花费了3分钟,show
花费了25分钟,而write
花费了约40分钟,尽管最终 did 写入了我要查找的单个文件表.
When I watch the Yarn UI, both print statements and the write
are using 25k mappers. The count
took 3 mins, the show
took 25 mins, and the write
took ~40 mins, although it finally did write the single file table I was looking for.
在我看来,第一行应该占据前500行并将它们合并到一个分区,然后其他行应该非常快地发生(在单个映射器/缩减器上).有人可以在这里看到我在做什么吗?有人告诉我也许我应该使用sample
而不是limit
,但是据我了解,limit
应该更快.是吗?
It seems to me like the first line should take the top 500 rows and coalesce them to a single partition, and then the other lines should happen extremely fast (on a single mapper/reducer). Can anyone see what I'm doing wrong here? I've been told maybe I should use sample
instead of limit
but as I understand it limit
should be much faster. Is that right?
提前感谢您的任何想法!
Thanks in advance for any thoughts!
推荐答案
我将首先处理print
函数问题,因为这是理解spark的基础.然后limit
与sample
.然后repartition
与coalesce
.
I’ll approach the print
functions issue first, as it’s something fundamental to understanding spark. Then limit
vs sample
. Then repartition
vs coalesce
.
print
函数以这种方式花费很长时间的原因是因为coalesce
是惰性转换. spark中的大多数转换都是惰性的,并且只有在调用 action 时才能进行评估.
The reasons the print
functions take so long in this manner is because coalesce
is a lazy transformation. Most transformations in spark are lazy and do not get evaluated until an action gets called.
动作是可以做的事情,并且(大多)不要返回结果.类似于count
,show
.它们返回一个数字和一些数据,而coalesce
返回一个具有1个分区的数据帧(有点,见下文).
Actions are things that do stuff and (mostly) dont return a new dataframe as a result. Like count
, show
. They return a number, and some data, whereas coalesce
returns a dataframe with 1 partition (sort of, see below).
发生的事情是,您每次在tiny
数据帧上调用操作时,都会重新运行sql查询和coalesce
调用.这就是为什么他们每次通话都使用25k映射器的原因.
What is happening is that you are rerunning the sql query and the coalesce
call each time you call an action on the tiny
dataframe. That’s why they are using the 25k mappers for each call.
为节省时间,请在第一行中添加.cache()
方法(无论如何对于您的print
代码).
To save time, add the .cache()
method to the first line (for your print
code anyway).
然后,数据帧转换实际上是在第一行上执行的,结果一直保存在spark节点的内存中.
Then the data frame transformations are actually executed on your first line and the result persisted in memory on your spark nodes.
这不会对第一行的初始查询时间产生任何影响,但至少您不会再运行该查询两次,因为结果已被缓存,然后操作便可以使用该缓存的结果.
This won’t have any impact on the initial query time for the first line, but at least you’re not running that query 2 more times because the result has been cached, and the actions can then use that cached result.
要从内存中删除它,请使用.unpersist()
方法.
To remove it from memory, use the .unpersist()
method.
现在为您要执行的实际查询...
这实际上取决于您的数据如何分区.就像是,它是否被划分在特定的字段上等等?
It really depends on how your data is partitioned. As in, is it partitioned on specific fields etc...
您在问题中提到了它,但是sample
可能是正确的方法.
You mentioned it in your question, but sample
might the right way to go.
这是为什么?
limit
必须在第一行中搜索500条.除非按行号(或某种递增ID)对数据进行分区,否则前500行可以存储在25k分区中的任何一个中.
limit
has to search for 500 of the first rows. Unless your data is partitioned by row number (or some sort of incrementing id) then the first 500 rows could be stored in any of the the 25k partitions.
因此spark必须在所有参数中进行搜索,直到找到所有正确的值为止.不仅如此,它还必须执行一个额外的步骤来对数据进行排序以使其具有正确的顺序.
So spark has to go search through all of them until it finds all the correct values. Not only that, it has to perform an additional step of sorting the data to have the correct order.
sample
只能获取500个随机值.由于涉及的数据没有顺序/排序,而且不必在特定的分区中搜索特定的行,因此操作起来容易得多.
sample
just grabs 500 random values. Much easier to do as there’s no order/sorting of the data involved and it doesn’t have to search through specific partitions for specific rows.
limit
可以更快,但也有其局限性.我通常只将其用于非常小的子集(如10/20行).
While limit
can be faster, it also has its, erm, limits. I usually only use it for very small subsets like 10/20 rows.
现在可以进行分区了....
Now for partitioning....
我认为coalesce
的问题是实际上会更改分区.现在我不确定,所以要放些盐.
The problem I think with coalesce
is it virtually changes the partitioning. Now I’m not sure about this, so pinch of salt.
根据pyspark
文档:
因此,您的500行实际上仍然位于您的25,000个物理分区上,这些分区被spark视为1个虚拟分区.
So your 500 rows will actually still sit across your 25k physical partitions that are considered by spark to be 1 virtual partition.
在这里使用.repartition(1).cache()
造成随机播放(通常很糟糕)并保留在火花记忆中可能是一个好主意.因为当您write
时,不是让25k映射器查看物理分区,而是应该让1个映射器查看火花存储器中的内容.然后write
变得容易.您还需要处理一小部分,因此(希望)任何改组都应该是可管理的.
Causing a shuffle (usually bad) and persisting in spark memory with .repartition(1).cache()
is possibly a good idea here. Because instead of having the 25k mappers looking at the physical partitions when you write
, it should only result in 1 mapper looking at what is in spark memory. Then write
becomes easy. You’re also dealing with a small subset, so any shuffling should (hopefully) be manageable.
显然,这通常是不好的做法,并且不会改变spark在执行原始sql查询时可能要运行25k映射器的事实.希望sample
能够解决这个问题.
Obviously this is usually bad practice, and doesn’t change the fact spark will probably want to run 25k mappers when it performs the original sql query. Hopefully sample
takes care of that.
修改以明确改组,repartition
和coalesce
edit to clarify shuffling, repartition
and coalesce
您在4节点群集上的16个分区中有2个数据集.您想加入它们并作为新数据集写入16个分区中.
You have 2 datasets in 16 partitions on a 4 node cluster. You want to join them and write as a new dataset in 16 partitions.
数据1的行1可能在节点1上,数据2的行1可能在节点4上.
Row 1 for data 1 might be on node 1, and row 1 for data 2 on node 4.
为了将这些行连接在一起,spark必须物理移动一个或两个,然后写入新分区.
In order to join these rows together, spark has to physically move one, or both of them, then write to a new partition.
这是一种随机操作,可以在群集中物理移动数据.
That’s a shuffle, physically moving data around a cluster.
所有数据都被16分区并不重要,重要的是数据在群集中的位置.
It doesn’t matter that everything is partitioned by 16, what matters is where the data is sitting on he cluster.
data.repartition(4)
将物理地将数据从每个节点的每4个分区集移动到每个节点1个分区.
data.repartition(4)
will physically move data from each 4 sets of partitions per node into 1 partition per node.
Spark可能会将所有4个分区从节点1移到其他3个节点,并移到那些节点上的新单个分区中,反之亦然.
Spark might move all 4 partitions from node 1 over to the 3 other nodes, in a new single partition on those nodes, and vice versa.
我不希望这样做,但这是一个极端的例子,可以证明这一点.
I wouldn’t think it’d do this, but it’s an extreme case that demonstrates the point.
尽管coalesce(4)
调用不会移动数据,但它更聪明.相反,它会识别每个节点已经有4个分区&总共4个节点...我将每个节点的所有4个分区称为一个分区,然后我将拥有4个分区!"
A coalesce(4)
call though, doesn’t move the data, it’s much more clever. Instead, it recognises "I already have 4 partitions per node & 4 nodes in total... I’m just going to call all 4 of those partitions per node a single partition and then I’ll have 4 total partitions!"
因此它不需要移动任何数据,因为它只是将现有分区合并到一个合并的分区中.
So it doesn’t need to move any data because it just combines existing partitions into a joined partition.
这篇关于将Spark数据帧写入单个实木复合地板文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!