问题描述
我当前的 Java/Spark 单元测试方法通过使用本地"实例化 SparkContext 并运行使用 JUnit 进行单元测试.
My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit.
必须组织代码以在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数.
The code has to be organized to do I/O in one function and then call another with multiple RDDs.
这很好用.我有一个用 Java + Spark 编写的经过高度测试的数据转换.
This works great. I have a highly tested data transformation written in Java + Spark.
我可以用 Python 做同样的事情吗?
Can I do the same with Python?
我将如何使用 Python 运行 Spark 单元测试?
How would I run Spark unit tests with Python?
推荐答案
我也推荐使用 py.test.py.test 可以轻松创建可重用的 SparkContext 测试装置并使用它来编写简洁的测试函数.您还可以专门化设备(例如创建一个 StreamingContext)并在您的测试中使用它们中的一个或多个.
I'd recommend using py.test as well. py.test makes it easy to create re-usable SparkContext test fixtures and use it to write concise test functions. You can also specialize fixtures (to create a StreamingContext for example) and use one or more of them in your tests.
我在 Medium 上写了一篇关于这个主题的博文:
I wrote a blog post on Medium on this topic:
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
这是帖子的片段:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
这篇关于如何对 PySpark 程序进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!