问题描述
所以我想做的就是简单地转换字段:将年,月,日,小时,分钟
(如下所示为整数类型)转换为字符串类型.
So what I am trying to do is simply to convert fields:year, month, day, hour, minute
(which are of type integer as seen below) into a string type.
所以我有一个数据框df_src类型:
So I have a dataframe df_src of type :
<class 'pyspark.sql.dataframe.DataFrame'>
,这是它的架构:
root
|-- src_ip: string (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
|-- day: integer (nullable = true)
|-- hour: integer (nullable = true)
|-- minute: integer (nullable = true)
我也早先声明了一个函数:
I also declared a function earlier :
def parse_df_to_string(year, month, day, hour=0, minute=0):
second = 0
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(year, month, day, hour, minute, second)
我也做了一个测试,它就像一个魅力:
And I also did a test and it works like a charm :
print parse_df_to_string(2016, 10, 15, 21)
print type(parse_df_to_string(2016, 10, 15, 21))
2016-10-15 21:00:00
<type 'str'>
所以我也使用udf在spark api中做了类似的事情:
so I also did something similar as in spark api with udf :
from pyspark.sql.functions import udf
u_parse_df_to_string = udf(parse_df_to_string)
最后这个请求:
df_src.select('*',
u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
).show()
会导致:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-126-770b587e10e6> in <module>()
25 # Could not make this part wor..
26 df_src.select('*',
---> 27 u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
28 ).show()
/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/opt/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/opt/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
...
Py4JJavaError: An error occurred while calling o5074.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: parse_df_to_string(input[1, int, true], input[2, int, true], input[3, int, true], input[4, int, true], input[5, int, true])
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
at org.apache.spark.sql.execution.python.PythonUDF.doGenCode(PythonUDF.scala:27)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:101)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$generateExpressions$1.apply(CodeGenerator.scala:740)
...
我尝试了很多事情,我尝试仅使用一个参数和参数来调用该方法...但是没有帮助.
I tried many things, I tried to call the method with only one parameter&argument...but did not help.
它确实起作用的一种方法是通过创建一个具有新列的新数据框,如下所示:
One way it did work though is by creating a new dataframe with a new column as follow :
df_src_grp_hr_d = df_src.select('*', concat(
col("year"),
lit("-"),
col("month"),
lit("-"),
col("day"),
lit(" "),
col("hour"),
lit(":0")).alias('time'))`
之后,我可以将该列转换为时间戳记:
where after that I could cast the column to timestamp :
df_src_grp_hr_to_timestamp = df_src_grp_hr_d.select(
df_src_grp_hr_d['src_ip'],
df_src_grp_hr_d['year'],
df_src_grp_hr_d['month'],
df_src_grp_hr_d['day'],
df_src_grp_hr_d['hour'],
df_src_grp_hr_d['time'].cast('timestamp'))
推荐答案
好吧..我想我理解问题了...原因是因为我的dataFrame刚加载了很多数据,导致 show()
操作失败.
allright..I think I understand the problem...The cause is because my dataFrame just had a lot of data loaded in memory causing show()
action to fail.
我认识到的是导致异常的原因:
The way I realize it is that what is causing the exception :
Py4JJavaError: An error occurred while calling o2108.showString.
: java.lang.UnsupportedOperationException: Cannot evaluate expression:
确实是 df.show()
动作.
我可以通过执行以下代码段来确认这一点:将pyspark字符串转换为日期格式
I could confirm that by executing the code snippet from :Convert pyspark string to date format
from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType
# Creation of a dummy dataframe:
df1 = sqlContext.createDataFrame([("11/25/1991","11/24/1991","11/30/1991"),
("11/25/1391","11/24/1992","11/30/1992")], schema=['first', 'second', 'third'])
# Setting an user define function:
# This function converts the string cell into a date:
func = udf (lambda x: datetime.strptime(x, '%M/%d/%Y'), DateType())
df = df1.withColumn('test', func(col('first')))
df.show()
df.printSchema()
有效!但它仍然无法与我的dataFrame df_src
一起使用.
which worked! But it still did not work with my dataFrame df_src
.
原因是因为我正在从数据库服务器中加载大量数据(例如超过8-9百万行),所以当 .show()
(默认情况下显示20个条目)加载到dataFrame中的结果.
The cause is because I am loading a lot a lot of data in memory from my database server (like over 8-9 millions of rows) it seems that spark is unable to perform the execution within udf when .show()
(which displays 20 entries by default) of the results loaded in a dataFrame.
即使调用show(n = 1),也会引发相同的异常.
Even if show(n=1) is called, same exception would be thrown.
但是,如果调用printSchema(),则会看到新列已被有效添加.
But if printSchema() is called, you will see that the new column is effectively added.
查看是否添加新列的一种方法是简单地调用操作 print dataFrame.take(10)
.
One way to see if the new column is added it would be simply to call the action print dataFrame.take(10)
instead.
最后,一种使它起作用的方法是影响一个新的数据框,而在select()中以如下方式调用udf时不调用 .show()
:
Finally, one way to make it work is to affect a new dataframe and not call .show()
when calling udf in a select() as :
df_to_string = df_src.select('*',
u_parse_df_to_string(df_src['year'], df_src['month'], df_src['day'], df_src['hour'], df_src['minute'])
)
然后将其缓存:
df_to_string.cache
现在可以毫无问题地调用 .show()
了:
Now .show()
can be called with no issues :
df_to_string.show()
这篇关于UnsupportedOperationException:无法评估表达式:..在使用column()和udf()添加新列时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!