并行创建多个数据帧

并行创建多个数据帧

本文介绍了Spark:并行创建多个数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在根据 ID 列表生成数据帧 - 基于一个 ID 的每个查询都会返回一个非常大型 PostgreSQL 表的可管理子集.然后我根据需要写出的文件结构对该输出进行分区.问题是我达到了速度限制,而且我的执行者资源利用率严重不足.

I'm currently generating DataFrames based on a list of IDs - each query based on one ID gives back a manageable subset of a very large PostgreSQL table. I then partition that output based on the file structure I need to write out. The problem is that I'm hitting a speed limit and majorly under-utilizing my executor resources.

我不确定这是否是重新思考我的架构的问题,或者是否有一些简单的方法可以解决这个问题,但基本上我想获得更多的任务并行化,但未能让我所有的 16 个执行者都忙同时尝试尽快完成这项 ETL 工作.

I’m not sure if this is a matter of rethinking my architecture or if there is some simple way to get around this, but basically I want to get more parallelization of tasks but am failing to keep all of my 16 executors busy while trying to do this ETL job as quickly as possible.

所以...这是我认为我可以做的来加快速度:

So...here’s what I thought I could do to speed this up:

  1. 并行化一个列表.
  2. 然后列表中的每个元素,在一个执行器上,通过 jdbc 选择一个(相对较小的)DataFrame.
  3. 然后是 foreachPartition(其中必然有几个),我需要执行一些操作(包括从每个分区原子写入数据),并且这些分区操作也可以分支到工作节点/执行程序.

当前代码看起来像这样,但当然会抛出py4j.Py4JException: Method getnewargs([]) does not exist",因为无法将 spark 会话上下文传递到 foreach 闭包中这将允许它留在执行者身上:

Current code looks something like this, but of course throws "py4j.Py4JException: Method getnewargs([]) does not exist" because the spark session context can’t be passed into the foreach closure that would allow this to stay on the executors:

spark = SparkSession \
    .builder \
    .appName
    ... etc

# the list, distributed to workers
idsAndRegionsToProcess = sc.parallelize(idList)

# the final thing that needs to be done with the data
# (each partition written to a file and sent somewhere)
def transformAndLoad(iterator, someField, someOtherField):
    for row in iterator:
        ...do stuff
    ...write a file to S3

# !! The issue is here (well, at least with my current approach)!!
# In theory these are the first operations that really need to be
# running on various nodes.
def dataMove(idAndRegion, spark):
        # now pull dataFrames from Postgres foreach id
        postgresDF = spark.read \
            .format("jdbc") \
            .option("url" …
        .option("dbtable", "(select id, someField, someOtherField from table_region_" + idAndRegion[1] + " where id = ‘" + idAndRegion[0] + "') as \history") \
        … more setup

    postgresDF.repartition('someOtherField')
    postgresDF.persist(StorageLevel.MEMORY_AND_DISK)
    postgresDF.foreachPartition(lambda iterator: transformAndLoad(iterator, someField, someOtherField))

# invoking the problematic code on the parallelized list
idsAndRegionsToProcess.foreach(lambda idAndRegion: dataMove(idAndRegion, spark))

我知道这不是完全可能的,但也许我错过了使这成为可能的微妙之处?这似乎比选择 1TB 的数据然后处理它更有效,但也许有一些我不知道的底层分页.

I get that this isn’t quite possible this way, but maybe I’m missing a subtlety that would make this possible? This seems a lot more efficient than selecting 1TB of data and then processing that, but maybe there is some underlying pagination that I don't know about.

我有非常相似的工作代码,在收集的列表上运行一个常规循环,否则使用几乎完全相同的代码,但是这非常缓慢,并且没有接近使用执行器.

I have very similar working code with a regular loop operating on a collected list using almost this exact code otherwise, but this was painfully slow and isn’t coming close to utilizing the executors.

对于一些额外的上下文,我在 EMR 和 YARN 上,我的 spark-submit(来自主节点)如下所示:spark-submit --packages org.postgresql:postgresql:9.4.1207.jre7 --deploy-mode cluster --num-executors 16 --executor-memory 3g --master yarn DataMove.py

For some extra context I’m on EMR and YARN and my spark-submit (from the master node) looks like this:spark-submit --packages org.postgresql:postgresql:9.4.1207.jre7 --deploy-mode cluster --num-executors 16 --executor-memory 3g --master yarn DataMove.py

此外,选择这些 DataFrame 没有问题,因为结果是数据的一个小子集并且数据库被正确索引,但是选择每个整个表似乎是绝对不可能的,因为可能有多达 TB 的数据在其中一些.此外,重新分区根据需要写入到 s3 的每个(单独的和特定命名的)文件中的内容来划分.

Also, selecting these DataFrames is not problematic as the result is a small subset of the data and the database is indexed correctly, but selecting each entire table seems like it would be absolutely impossible as there could be up to a TB of data in some of them. Also, the repartition divides it out by what needs to be written into each (individual and specifically-named) file going to s3.

我愿意接受任何建议,即使这只是意味着使用我的工作代码并以某种方式让它开始尽可能多的工作,而其他事情仍然从最后开始运行.但首先,我的方法可以奏效吗?

I would be open to any suggestions, even if it just means using my working code and somehow getting it to kick off as many jobs as it can while other things are still running from the last. But first and foremost, can my approach here work?

推荐答案

您可以将数据工作负载作为单独的作业/应用程序运行在 Spark 集群上,如下所述:

You could look into running your data workload as separate jobs / applications on your Spark cluster as described here:

https://spark.apache.org/docs/latest/submitting-applications.html

但是您关于将数据存储在多个分区中的评论也应该极大地有助于减少处理它所需的内存.您或许可以避免通过这种方式将其拆分为单独的作业.

But your comment about storing the data in multiple partitions should also greatly help to reduce the memory needed to process it. You may be able to avoid splitting it up into separate jobs that way.

Spark UI 位于:

The Spark UI at:

http://localhost:4040

是您的朋友,可帮助您了解您的工作在 Spark 内部创建了哪些步骤以及它消耗了哪些资源.根据这些见解,您可以对其进行优化并减少所需的内存量或提高处理速度.

is your friend in figuring out what steps your job is creating in Spark internally and what resources it consumes. Based on those insights you can optimize it and reduce the amount of memory needed or improve the processing speed.

这篇关于Spark:并行创建多个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-01 04:48