我的设置是:具有160 GB,48个vcore的3节点YARN群集上的Spark 2.1.动态分配已打开. spark.executor.memory = 6G
, spark.executor.cores = 6
My settings are: Spark 2.1 on a 3 node YARN cluster with 160 GB, 48 vcores.Dynamic allocation turned on.spark.executor.memory=6G
, spark.executor.cores=6
首先,我正在阅读配置单元表:订单(329MB)和订单项(1.43GB),以及做一个左外连接.接下来,我根据联合应用了7种不同的过滤条件数据集(类似于 var line1 = joindDf.filter("linenumber = 1")
, var line2 = joindDf.filter("l_linenumber = 2")
等).因为我要对联接的数据集进行多次过滤,所以我认为进行持久化( MEMORY_ONLY
First, I am reading hive tables: orders (329MB) and lineitems (1.43GB) anddoing a left outer join.Next, I apply 7 different filter conditions based on the joineddataset (something like var line1 = joinedDf.filter("linenumber=1")
, var line2 = joinedDf.filter("l_linenumber=2")
, etc).Because I'm filtering on the joined dataset multiple times, I thought doing a persist (MEMORY_ONLY
) would help here as the joined dataset will fits fully in memory.
我注意到使用persist时,Spark应用程序的运行时间要比不使用persist时要长(3.5分钟vs 3.3分钟).对于持久性,DAG显示为持久性创建了一个阶段,其他下游作业正在等待持久性完成.这是否意味着持续存在阻塞电话?还是在持久块可用时其他作业的阶段开始处理?
I noticed that with persist, the Spark application takes longer to run than without persist (3.5 mins vs 3.3 mins). With persist, the DAG shows that a single stage was created for persist and other downstream jobs are waiting for the persist to complete.Does that mean persist is a blocking call? Or do stages in other jobs start processing when persisted blocks become available?
In the non-persist case, different jobs are creating different stages to read the same data. Data is read multiple times in different stages, but this is still is turning out to be faster than the persist case.
对于较大的数据集,持久保留实际上会使执行程序用尽内存(Java堆空间).没有持久性,Spark作业就可以顺利完成.我在这里查看了其他一些建议:Spark java.lang.OutOfMemoryError:Java堆空间
With larger data sets, persist actually causes executors to run out ofmemory (Java heap space). Without persist, the Spark jobs complete just fine. I looked at some other suggestions here: Spark java.lang.OutOfMemoryError: Java heap space
.I tried increasing/decreasing executor cores, persistingwith disk only, increasing partitions, modifying the storage ratio, but nothing seems to help with executor memory issues.
I would appreciate it if someone could mention how persist works, in what cases it is faster than not-persisting and more importantly, how to go about troubleshooting out of memory issues.
I'd recommend reading up on the difference between transformations and actions in spark. I must admit that I've been bitten by this myself on multiple occasions.
spark中的数据是惰性计算的,这实际上意味着在执行操作"之前不会发生任何事情. .filter()
函数是一个转换,因此,当代码到达该点时,实际上没有任何反应,只是在转换管道中添加了一个部分.调用 .persist()
Data in spark is evaluated lazily, which essentially means nothing happens until an "action" is performed. The .filter()
function is a transformation, so nothing actually happens when your code reaches that point, except to add a section to the transformation pipeline. A call to .persist()
behaves in the same way.
如果您在 .persist()
调用下游的代码具有可以同时触发的多个动作,则很可能您实际上是在分别持久化"每个动作的数据,并且吃掉了内存(Spark UI中的存储"选项卡将告诉您数据集的缓存百分比,如果缓存百分比超过100%,您将看到我在此处描述的内容.)更糟糕的是,您可能实际上从未使用过缓存数据
If your code downstream of the .persist()
call has multiple actions that can be triggered simultaneously, then it's quite likely that you are actually "persisting" the data for each action separately, and eating up memory (The "Storage' tab in the Spark UI will tell you the % cached of the dataset, if it's more than 100% cached, then you are seeing what I describe here). Worse, you may never actually be using cached data.
通常,如果您在代码中有一点,数据集会分叉到两个单独的转换管道(在示例中为每个或单独的 .filter()
s),则为 .persist()
Generally, if you have a point in code where the data set forks into two separate transformation pipelines (each or the separate .filter()
s in your example), a .persist()
is a good idea to prevent multiple readings of your data source, and/or to save the result of an expensive transformation pipeline before the fork.
很多时候,最好在 .persist()
Many times it's a good idea to trigger a single action right after the .persist()
call (before the data forks) to ensure that later actions (which may run simultaneously) read from the persisted cache, rather than evaluate (and uselessly cache) the data independently.
在 .persist()
之后但在 .filter()
s之前执行 joinedDF.count()
Do a joinedDF.count()
after your .persist()
, but before your .filter()