我想从现有的spark数据框中创建一个空的dataframe。我使用pyarrow支持(在spark conf中启用)。当我尝试从空的RDD和与现有数据帧相同的架构中创建空数据框时,遇到了java.lang.NegativeArraySizeException。这是重现错误的完整代码

spark = SparkSession.builder \
                    .config("spark.sql.execution.arrow.enabled", "true") \
                    .getOrCreate()
df = spark.createDataFrame(["10","11","13"], "string").toDF("age")
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)
empty_pandas_df = empty_df.toPandas()


这是完整的堆栈跟踪:

/conda_env/lib/python3.6/site-packages/pyarrow/__init__.py:157: UserWarning: pyarrow.open_stream is deprecated, please use pyarrow.ipc.open_stream
  warnings.warn("pyarrow.open_stream is deprecated, please use "
/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py:2139: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.fallback.enabled' does not have an effect on failures in the middle of computation.
  An error occurred while calling o349.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:874)
    at org.apache.spark.api.python.PythonServer.getResult(PythonRDD.scala:870)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NegativeArraySizeException
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3293)
    at org.apache.spark.sql.Dataset$$anonfun$collectAsArrowToPython$1$$anonfun$apply$17.apply(Dataset.scala:3287)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply$mcV$sp(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7$$anonfun$apply$3.apply(PythonRDD.scala:456)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:457)
    at org.apache.spark.api.python.PythonRDD$$anonfun$7.apply(PythonRDD.scala:453)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:994)
    at org.apache.spark.api.python.SocketFuncServer.handleConnection(PythonRDD.scala:988)
    at org.apache.spark.api.python.PythonServer$$anonfun$11$$anonfun$apply$9.apply(PythonRDD.scala:853)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:853)
    at org.apache.spark.api.python.PythonServer$$anonfun$11.apply(PythonRDD.scala:852)
    at org.apache.spark.api.python.PythonServer$$anon$1.run(PythonRDD.scala:908)

  warnings.warn(msg)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-61602774c141> in <module>
----> 1 empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in toPandas(self)
   2120                         _check_dataframe_localize_timestamps
   2121                     import pyarrow
-> 2122                     batches = self._collectAsArrow()
   2123                     if len(batches) > 0:
   2124                         table = pyarrow.Table.from_batches(batches)

/conda_env/lib/python3.6/site-packages/pyspark/sql/dataframe.py in _collectAsArrow(self)
   2182                 return list(_load_from_socket((port, auth_secret), ArrowStreamSerializer()))
   2183             finally:
-> 2184                 jsocket_auth_server.getResult()  # Join serving thread and raise any exceptions
   2185
   2186     ##########################################################################################

/conda_env/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258
   1259         for temp_arg in temp_args:

/conda_env/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/conda_env/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(


当我禁用pyarrow时,错误消失

spark.conf.set("spark.sql.execution.arrow.enabled","false")


这是pyspark的已知问题还是与pyarrow有关?

N.B:此错误仅在pyspark> = 2.4.4时可重现。

最佳答案

收集RDD并根据结果创建熊猫数据框的问题的解决方法,如下所示:
您代码中的另一个问题是使用':'替换为','

from pyspark.sql import SparkSession
import pyarrow as pa
import pandas as pd


spark = SparkSession.builder.config("spark.sql.execution.arrow.enabled", "true").getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], "string").toDF("age")

empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), df.schema, verifySchema=True)
empty_pandas_df = empty_df.collect()
empty_pandas_df = pd.DataFrame(empty_pandas_df)

print(empty_pandas_df)
df.show()


输出

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/22 11:08:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Empty DataFrame
Columns: []
Index: []
[Stage 2:>                                                          (0 + 3) / 3]+---+
|age|
                                                                                +---+
| 10|
| 11|
| 13|
+---+

关于python - 使用pyarrow时Spark拒绝创建空的数据框,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/58014063/

10-12 22:41
查看更多