本文介绍了如何对 PySpark 程序进行单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前的 Java/Spark 单元测试方法通过使用本地"实例化 SparkContext 并运行使用 JUnit 进行单元测试.

My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit.

必须组织代码以在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数.

The code has to be organized to do I/O in one function and then call another with multiple RDDs.

这很好用.我有一个用 Java + Spark 编写的经过高度测试的数据转换.

This works great. I have a highly tested data transformation written in Java + Spark.

我可以用 Python 做同样的事情吗?

Can I do the same with Python?

我将如何使用 Python 运行 Spark 单元测试?

How would I run Spark unit tests with Python?

推荐答案

我也推荐使用 py.test.py.test 可以轻松创建可重用的 SparkContext 测试装置并使用它来编写简洁的测试函数.您还可以专门化设备(例如创建一个 StreamingContext)并在您的测试中使用它们中的一个或多个.

I'd recommend using py.test as well. py.test makes it easy to create re-usable SparkContext test fixtures and use it to write concise test functions. You can also specialize fixtures (to create a StreamingContext for example) and use one or more of them in your tests.

我在 Medium 上写了一篇关于这个主题的博文:

I wrote a blog post on Medium on this topic:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

这是帖子的片段:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}
    assert results == expected_results

这篇关于如何对 PySpark 程序进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:30