本文介绍了Spark AttributeError: 无法在 <module 'pandas.core.internals.blocks' 上获取属性 'new_block'的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 AWS EMR 上使用 pyspark(4 个 r5.xlarge 作为 4 个工作器,每个有一个执行器和 4 个内核),我得到 AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'.以下是引发此错误的代码片段:

I was using pyspark on AWS EMR (4 r5.xlarge as 4 workers, each has one executor and 4 cores), and I got AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'. Below is a snippet of the code that threw this error:

search =  SearchEngine(db_file_dir = "/tmp/db")
conn = sqlite3.connect("/tmp/db/simple_db.sqlite")
pdf_ = pd.read_sql_query('''select  zipcode, lat, lng,
                        bounds_west, bounds_east, bounds_north, bounds_south from
                        simple_zipcode''',conn)
brd_pdf = spark.sparkContext.broadcast(pdf_)
conn.close()


@udf('string')
def get_zip_b(lat, lng):
    pdf = brd_pdf.value
    out = pdf[(np.array(pdf["bounds_north"]) >= lat) &
              (np.array(pdf["bounds_south"]) <= lat) &
              (np.array(pdf['bounds_west']) <= lng) &
              (np.array(pdf['bounds_east']) >= lng) ]
    if len(out):
        min_index = np.argmin( (np.array(out["lat"]) - lat)**2 + (np.array(out["lng"]) - lng)**2)
        zip_ = str(out["zipcode"].iloc[min_index])
    else:
        zip_ = 'bad'
    return zip_

df = df.withColumn('zipcode', get_zip_b(col("latitude"),col("longitude")))

下面是回溯,get_zip_b 中的第 102 行指的是 pdf = brd_pdf.value:

Below is the traceback, where line 102, in get_zip_b refers to pdf = brd_pdf.value:

21/08/02 06:18:19 WARN TaskSetManager: Lost task 12.0 in stage 7.0 (TID 1814, ip-10-22-17-94.pclc0.merkle.local, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/util.py", line 121, in wrapper
    return f(*args, **kwargs)
  File "/mnt/var/lib/hadoop/steps/s-1IBFS0SYWA19Z/Mobile_ID_process_center.py", line 102, in get_zip_b
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 146, in value
    self._value = self.load_from_path(self._path)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 123, in load_from_path
    return self.load(f)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 129, in load
    return pickle.load(file)
AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks' from '/mnt/miniconda/lib/python3.9/site-packages/pandas/core/internals/blocks.py'>

一些观察和思考过程:

1、上网查了一下,pyspark中的AttributeError好像是driver和worker的pandas版本不匹配导致的?

1, After doing some search online, the AttributeError in pyspark seems to be caused by mismatched pandas versions between driver and workers?

2,但是我在两个不同的数据集上运行了相同的代码,一个没有任何错误,而另一个没有,这看起来非常奇怪和不确定,而且错误似乎不是由不匹配的熊猫版本引起的.否则,两个数据集都不会成功.

2, But I ran the same code on two different datasets, one worked without any errors but the other didn't, which seems very strange and undeterministic, and it seems like the errors may not be caused by mismatched pandas versions. Otherwise, neither two datasets would succeed.

3,然后我再次在成功的数据集上运行相同的代码,但这次使用不同的火花配置:将 spark.driver.memory 从 2048M 设置为 4192m,它抛出了 AttributeError.

3, I then ran the same code on the successful dataset again, but this time with different spark configurations: setting spark.driver.memory from 2048M to 4192m, and it threw AttributeError.

4、总之,我认为 AttributeError 与驱动程序有关.但我无法从错误消息中判断它们之间的关系,以及如何修复它:AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'.

4, In conclusion, I think the AttributeError has something to do with driver. But I can't tell how they are related from the error message, and how to fix it: AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'.

推荐答案

我在服务器中使用 pandas 1.3.2 而在客户端使用 1.2 时遇到了同样的错误.将 pandas 降级到 1.2 解决了这个问题.

I had the same error using pandas 1.3.2 in the server while 1.2 in my client.Downgrading pandas to 1.2 solved the problem.

这篇关于Spark AttributeError: 无法在 &lt;module 'pandas.core.internals.blocks' 上获取属性 'new_block'的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 10:20