我的pyspark应用程序在106,36 MB数据集(817.270条记录)上运行UDF,使用常规python lambda函数大约需要100个小时。我生成了一个Google Dataproc集群,其中包含20个工作节点,每个工作节点有8个vCPU。但是,执行时仅使用3个节点和3个vCPU。显然,我希望集群使用我提供的所有资源。
结果数据帧的默认分区数为8。我尝试将其重新分区为100,但是群集仅使用3个节点和3个vCPU。另外,当我运行命令以检查 Spark 看到的执行程序的数量时,它只有3个。
这是要执行的pyspark代码:
from pyspark.sql.types import StringType, MapType
from pyspark.sql.functions import udf
customer_names = spark.createDataFrame(customer_names)
embargo_match_udf = udf(lambda x,y: embargoMatch(x,y), MapType(StringType(), StringType()))
customer_names = customer_names.withColumn('JaroDistance', embargo_match_udf('name','customer_code'))
result = customer_names.withColumn('jaro_similarity', customer_names.JaroDistance['max_jaro'])
result.write.format("com.databricks.spark.csv").save('gs://charles-embargo-bucket/sparkytuesday')
这是从我的jupyter笔记本上看到的一些 Spark 输出
print(sc) -> <SparkContext master=yarn appName=PySparkShell>
print(result.rdd.getNumPartitions()) -> 8
result = result.repartition(100)
print(result.rdd.getNumPartitions()) -> 100
sc._jsc.sc().getExecutorMemoryStatus().size() -> 3
最佳答案
对于那些对我如何解决此问题感兴趣的人:
默认情况下,无论我在Google Cloud的Dataproc UI中生成了多少个额外的节点,我的Spark上下文都假定有两个工作节点。因此,我手动更改了Spark上下文,如下所示:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.conf import SparkConf
sc.stop()
SparkContext.setSystemProperty('spark.executor.cores', '4')
SparkContext.setSystemProperty('spark.executor.instances', '5')
sc = SparkContext("yarn", "embargotest")
spark = SparkSession.builder.appName('embargotest').getOrCreate()
此外,在将.withColumn函数应用于此数据框之前,我将customer_names数据集显式划分为20个(4核x 5个实例)。
customer_names = spark.createDataFrame(customer_names).repartition(20)
希望这可以帮助遇到类似问题的人!