问题描述
我当前的Java/Spark单元测试方法(在此处)有效,方法是使用本地"实例化SparkContext并运行使用JUnit进行单元测试.
My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit.
必须组织代码以在一个功能中进行I/O,然后使用多个RDD调用另一个功能.
The code has to be organized to do I/O in one function and then call another with multiple RDDs.
这很好.我有一个用Java + Spark编写的经过高度测试的数据转换.
This works great. I have a highly tested data transformation written in Java + Spark.
我可以用Python做同样的事情吗?
Can I do the same with Python?
如何使用Python运行Spark单元测试?
How would I run Spark unit tests with Python?
推荐答案
我建议也使用py.test. py.test使得创建可重复使用的SparkContext测试装置变得容易,并使用它编写简洁的测试功能.您还可以专门测试夹具(例如,创建一个StreamingContext)并在测试中使用其中的一个或多个.
I'd recommend using py.test as well. py.test makes it easy to create re-usable SparkContext test fixtures and use it to write concise test functions. You can also specialize fixtures (to create a StreamingContext for example) and use one or more of them in your tests.
我写了一篇有关中型的博客文章:
I wrote a blog post on Medium on this topic:
https://engblog.nextdoor.com/unit-testing-apache -spark-with-py-test-3b8970dc013b
这是帖子中的摘录:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
这篇关于如何对PySpark程序进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!