问题描述
我正在尝试将Spark应用程序提交到我的机器(通过Docker Dashboard创建)上的本地Kubernetes集群.该应用程序依赖于 python软件包,我们将其称为 X .
I am trying to submit a Spark Application to the local Kubernetes cluster on my machine (created via Docker Dashboard). The application depends on a python package, let's call it X.
以下是应用程序代码:
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
datafolder = "/opt/spark/data" # Folder created in container by spark's docker file
sys.path.append(datafolder) # X is contained inside of datafolder
from X.predictor import * # import functionality from X
def apply_x_functionality_on(item):
predictor = Predictor() # class from X.predictor
predictor.predict(item)
def main():
spark = SparkSession\
.builder\
.appName("AppX")\
.getOrCreate()
sc = spark.sparkContext
data = []
# Read data: [no problems there]
...
data_rdd = sc.parallelize(data) # create RDD
data_rdd.foreach(lambda item: apply_network(item)) # call function
if __name__ == "__main__":
main()
最初,我希望通过将 X 文件夹放置到Spark的data文件夹中来避免此类问题.构建容器时,数据文件夹的所有内容都将复制到/opt/spark/data 中.我的Spark应用程序将data文件夹的内容附加到系统路径,这样就消耗了包X.嗯,我认为确实如此.
Initially I hoped to avoid such problems by putting the X folder to the data folder of Spark. When container is built, all the content of data folder is being copied to the /opt/spark/data. My Spark application appends contents of data folder to the system path, as such consuming the package X. Well, I thought it does.
一切正常,直到调用 .foreach 函数.这是来自loggs的代码段,其中包含错误描述:
Everything works fine until the .foreach function is called. Here is a snippet from loggs with error description:
20/11/25 16:13:54 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.1.0.60, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 587, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
command = serializer._read_with_length(file)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'X'
这里有很多类似的问题:,两个,,但到目前为止,对它们的回答都没有帮助我.
There are a lot of similar questions here: one, two, three, but none of the answers to them have helped me so far.
我尝试过的事情:
- 我使用.zip(ed)X提交了应用程序(通过将zip应用于X,将其压缩在容器中)
$SPARK_HOME/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=kostjaigin/spark-py:v3.0.1-X_0.0.1 \
--py-files "local:///opt/spark/data/X.zip" \
local:///opt/spark/data/MyApp.py
- 我在Spark上下文中添加了.zip(ed)X:
sc.addPyFile("opt/spark/data/X.zip")
推荐答案
我已经解决了该问题:
- 在/opt/spark/data下创建
- 依赖项文件夹
- 将X放入依赖项
- 在我的docker文件中,我将Dependencies文件夹打包到一个zip存档中,以便以后以py文件的形式提交:
cd/opt/spark/data/** dependencies **&&zip -r ../dependencies.zip.
- 正在申请中:
- Created dependencies folder under /opt/spark/data
- Put X to dependencies
- Inside of my docker file I pack dependencies folder in a zip archive to submit it later as py-files:
cd /opt/spark/data/**dependencies** && zip -r ../dependencies.zip .
- In Application:
...
from X.predictor import * # import functionality from X
...
# zipped package
zipped_pkg = os.path.join(datafolder, "dependencies.zip")
assert os.path.exists(zipped_pkg)
sc.addPyFile(zipped_pkg)
...
- 在提交命令中添加--py-files标志:
$SPARK_HOME/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=5 \
--py-files "local:///opt/spark/data/dependencies.zip" \
local:///opt/spark/data/MyApp.py
- 运行
基本上,所有操作都是关于添加一个dependencies.zip存档,其中包含所有必需的依赖项.
Basically it is all about adding a dependencies.zip Archive with all the required dependencies in it.
这篇关于PySpark中的ModuleNotFoundError由serializers.py引起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!