我正在通过PySpark探索Spark Streaming,并尝试在transform
中使用take
函数时遇到错误。
我可以通过sortBy
和DStream
成功地对transform
使用pprint
和结果。
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
但是,如果我按照相同的模式使用
take
并尝试对其进行pprint
:top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
工作因
Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'
您可以在the notebook here中查看完整的代码和输出。
我究竟做错了什么?
最佳答案
传递给transform
的函数应从RDD
转换为RDD
。如果使用诸如take
之类的操作,则必须将结果转换回RDD
:
sc: SparkContext = ...
author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)
相反,使用的
RDD.sortBy
是转换(返回RDD),因此不需要进一步的并行化。附带说明以下功能:
lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)
没有多大意义。请记住,Spark按随机排序排序,因此不稳定。如果要按多个字段排序,则应使用如下组合键:
lambda x: (x[0].lower(), -x[1])