我是火花(pyspark)的新手,不确定为什么以下方法能正常工作:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType,DoubleType
print (sq_lmi.printSchema())
def calc_V1(row):
a=row.SQFT_ID
V1=a
return V1
calc_V1_udf = udf(calc_V1, DoubleType()) #register the function and its return type
new_df = sq_lmi.withColumn("V1", calc_V1_udf(struct([sq_lmi[x] for x in sq_lmi.columns]))) #apply - the struct is needed to send the entire row
new_df.select('V1').show(5)
输出:
root
|-- ID: integer (nullable = true)
|-- LMI_HMT: string (nullable = true)
|-- SQFT: integer (nullable = true)
|-- SQFT_ID: double (nullable = true)
None
+-------------------+
| V1|
+-------------------+
| 29.335526315789473|
| 20.689655172413794|
| 22.97872340425532|
| 23.776223776223777|
|0.18512170037709977|
+-------------------+
only showing top 5 rows
但是,如果我在返回行之前对其进行任何操作(添加5.0,乘以5.0或添加/乘以“行”的另一列),则会发生错误:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType,DoubleType
print (sq_lmi.printSchema())
def calc_V1(row):
a=row.SQFT_ID*5.0
V1=a
return V1
calc_V1_udf = udf(calc_V1, DoubleType()) #register the function and its return type
new_df = sq_lmi.withColumn("V1", calc_V1_udf(struct([sq_lmi[x] for x in sq_lmi.columns]))) #apply - the struct is needed to send the entire row
new_df.select('V1').show(5)
root
|-- ID: integer (nullable = true)
|-- LMI_HMT: string (nullable = true)
|-- SQFT: integer (nullable = true)
|-- SQFT_ID: double (nullable = true)
None
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-62-d6c37eed3db5> in <module>()
16
17 new_df = sq_lmi.withColumn("V1", calc_V1_udf(struct([sq_lmi[x] for x in sq_lmi.columns]))) #apply - the struct is needed to send the entire row
---> 18 new_df.select('V1').show(5)
/usr/local/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o1884.showString.
最佳答案
最可能的是您具有Null值,并且您没有检查它们。
尝试做类似的事情:
def calc_V1(row):
if row is not None and row.SQFT_ID is not None :
a=row.SQFT_ID*5.0
V1=a
return V1
else:
return None
关于python - PySpark行功能,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/44850067/