PySpark中的ModuleNotFoundError由ser

PySpark中的ModuleNotFoundError由ser

本文介绍了PySpark中的ModuleNotFoundError由serializers.py引起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将Spark应用程序提交到我的机器(通过Docker Dashboard创建)上的本地Kubernetes集群.该应用程序依赖于 python软件包,我们将其称为 X .

I am trying to submit a Spark Application to the local Kubernetes cluster on my machine (created via Docker Dashboard). The application depends on a python package, let's call it X.

以下是应用程序代码:

import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
datafolder = "/opt/spark/data" # Folder created in container by spark's docker file
sys.path.append(datafolder) # X is contained inside of datafolder
from X.predictor import * # import functionality from X

def apply_x_functionality_on(item):
    predictor = Predictor() # class from X.predictor
    predictor.predict(item)

def main():
    spark = SparkSession\
            .builder\
            .appName("AppX")\
            .getOrCreate()
    sc = spark.sparkContext
    data = []
    # Read data: [no problems there]
    ...
    data_rdd = sc.parallelize(data) # create RDD
    data_rdd.foreach(lambda item: apply_network(item)) # call function

if __name__ == "__main__":
    main()

最初,我希望通过将 X 文件夹放置到Spark的data文件夹中来避免此类问题.构建容器时,数据文件夹的所有内容都将复制到/opt/spark/data 中.我的Spark应用程序将data文件夹的内容附加到系统路径,这样就消耗了包X.嗯,我认为确实如此.

Initially I hoped to avoid such problems by putting the X folder to the data folder of Spark. When container is built, all the content of data folder is being copied to the /opt/spark/data. My Spark application appends contents of data folder to the system path, as such consuming the package X. Well, I thought it does.

一切正常,直到调用 .foreach 函数.这是来自loggs的代码段,其中包含错误描述:

Everything works fine until the .foreach function is called. Here is a snippet from loggs with error description:

20/11/25 16:13:54 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.1.0.60, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 587, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'X'

这里有很多类似的问题:,两个,,但到目前为止,对它们的回答都没有帮助我.

There are a lot of similar questions here: one, two, three, but none of the answers to them have helped me so far.

我尝试过的事情:

  1. 我使用.zip(ed)X提交了应用程序(通过将zip应用于X,将其压缩在容器中)
$SPARK_HOME/bin/spark-submit \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=5 \
  --conf spark.kubernetes.container.image=kostjaigin/spark-py:v3.0.1-X_0.0.1 \
  --py-files "local:///opt/spark/data/X.zip" \
  local:///opt/spark/data/MyApp.py
  1. 我在Spark上下文中添加了.zip(ed)X:
sc.addPyFile("opt/spark/data/X.zip")

推荐答案

我已经解决了该问题:

    在/opt/spark/data下创建
  1. 依赖项文件夹
  2. 将X放入依赖项
  3. 在我的docker文件中,我将Dependencies文件夹打包到一个zip存档中,以便以后以py文件的形式提交: cd/opt/spark/data/** dependencies **&&zip -r ../dependencies.zip.
  4. 正在申请中:
  1. Created dependencies folder under /opt/spark/data
  2. Put X to dependencies
  3. Inside of my docker file I pack dependencies folder in a zip archive to submit it later as py-files: cd /opt/spark/data/**dependencies** && zip -r ../dependencies.zip .
  4. In Application:
...
from X.predictor import * # import functionality from X
...
# zipped package
zipped_pkg = os.path.join(datafolder, "dependencies.zip")
assert os.path.exists(zipped_pkg)
sc.addPyFile(zipped_pkg)
...
  1. 在提交命令中添加--py-files标志:
$SPARK_HOME/bin/spark-submit \
  --master k8s://https://kubernetes.docker.internal:6443 \
  --deploy-mode cluster \
  --conf spark.executor.instances=5 \
  --py-files "local:///opt/spark/data/dependencies.zip" \
  local:///opt/spark/data/MyApp.py
  1. 运行

基本上,所有操作都是关于添加一个dependencies.zip存档,其中包含所有必需的依赖项.

Basically it is all about adding a dependencies.zip Archive with all the required dependencies in it.

这篇关于PySpark中的ModuleNotFoundError由serializers.py引起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-29 20:46