本文介绍了如何在 Spark 2.0+ 中编写单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试寻找一种合理的方法来使用 JUnit 测试框架来测试 SparkSession.尽管 SparkContext 似乎有很好的示例,但我无法弄清楚如何获得适用于 SparkSession 的相应示例,即使它在内部的多个地方使用spark-testing-base.我很乐意尝试一个不使用 spark-testing-base 的解决方案,如果它不是真正正确的方法.

I've been trying to find a reasonable way to test SparkSession with the JUnit testing framework. While there seem to be good examples for SparkContext, I couldn't figure out how to get a corresponding example working for SparkSession, even though it is used in several places internally in spark-testing-base. I'd be happy to try a solution that doesn't use spark-testing-base as well if it isn't really the right way to go here.

简单的测试用例(完整的 MWE 项目,带有 build.sbt):

Simple test case (complete MWE project with build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

使用 JUnit 运行它的结果是负载线上的 NPE:

The result of running this with JUnit is an NPE at the load line:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

注意加载的文件是否存在无关紧要;在正确配置的 SparkSession 中,将抛出更合理的错误.

Note it shouldn't matter that the file being loaded exists or not; in a properly configured SparkSession, a more sensible error will be thrown.

推荐答案

感谢您提出这个悬而未决的问题.出于某种原因,当谈到 Spark 时,每个人都沉迷于分析,以至于忘记了过去 15 年左右出现的伟大软件工程实践.这就是我们在课程中重点讨论测试和持续集成(以及 DevOps 等其他内容)的原因.

Thank you for putting this outstanding question out there. For some reason, when it comes to Spark, everyone gets so caught up in the analytics that they forget about the great software engineering practices that emerged the last 15 years or so. This is why we make it a point to discuss testing and continuous integration (among other things like DevOps) in our course.

术语简介

true 单元测试意味着您可以完全控制测试中的每个组件.不能与数据库、REST 调用、文件系统甚至系统时钟交互;正如 Gerard Mezaros 在 xUnit 测试模式中所说的那样,一切都必须加倍"(例如模拟、存根等).我知道这看起来像语义,但它真的很重要.未能理解这一点是您在持续集成中看到间歇性测试失败的一个主要原因.

A true unit test means you have complete control over every component in the test. There can be no interaction with databases, REST calls, file systems, or even the system clock; everything has to be "doubled" (e.g. mocked, stubbed, etc) as Gerard Mezaros puts it in xUnit Test Patterns. I know this seems like semantics, but it really matters. Failing to understand this is one major reason why you see intermittent test failures in continuous integration.

我们仍然可以进行单元测试

因此,根据这种理解,对 RDD 进行单元测试是不可能的.但是,在开发分析时仍然有单元测试的地方.

So given this understanding, unit testing an RDD is impossible. However, there is still a place for unit testing when developing analytics.

考虑一个简单的操作:

rdd.map(foo).map(bar)

这里的foobar 是简单的函数.这些可以以正常方式进行单元测试,并且应该尽可能多地使用极端情况.毕竟,他们为什么要关心从哪里获取输入是来自测试装置还是 RDD?

Here foo and bar are simple functions. Those can be unit tested in the normal way, and they should be with as many corner cases as you can muster. After all, why do they care where they are getting their inputs from whether it is a test fixture or an RDD?

不要忘记 Spark Shell

这不是测试本身,但在这些早期阶段,您还应该在 Spark shell 中进行试验,以确定您的转换,尤其是您的方法的后果.例如,您可以使用toDebugStringexplainglom 等许多不同的函数检查物理和逻辑查询计划、分区策略和保存情况以及数据状态showprintSchema 等等.我会让你探索这些.

This isn't testing per se, but in these early stages you also should be experimenting in the Spark shell to figure out your transformations and especially the consequences of your approach. For example, you can examine physical and logical query plans, partitioning strategy and preservation, and the state of your data with many different functions like toDebugString, explain, glom, show, printSchema, and so on. I will let you explore those.

您还可以在 Spark shell 和您的测试中将您的 master 设置为 local[2],以识别仅在您开始分发工作后可能出现的任何问题.

You can also set your master to local[2] in the Spark shell and in your tests to identify any problems that may only arise once you start to distribute work.

使用 Spark 进行集成测试

现在是有趣的东西.

为了集成测试 Spark 在你对你的辅助函数和 RDD/DataFrame 转换逻辑的质量有信心后,它是做一些事情很关键(无论构建工具和测试框架如何):

In order to integration test Spark after you feel confident in the quality of your helper functions and RDD/DataFrame transformation logic, it is critical to do a few things (regardless of build tool and test framework):

  • 增加 JVM 内存.
  • 启用分叉但禁用并行执行.
  • 使用您的测试框架将您的 Spark 集成测试累积到套件中,并在所有测试之前初始化 SparkContext 并在所有测试之后停止它.
  • Increase JVM memory.
  • Enable forking but disable parallel execution.
  • Use your test framework to accumulate your Spark integration tests into suites, and initialize the SparkContext before all tests and stop it after all tests.

使用 ScalaTest,您可以混合使用 BeforeAndAfterAll(我通常更喜欢)或 BeforeAndAfterEach,就像 @ShankarKoirala 那样初始化和拆除 Spark 工件.我知道这是一个合理的例外,但我真的不喜欢那些你必须使用的可变 var.

With ScalaTest, you can mix in BeforeAndAfterAll (which I prefer generally) or BeforeAndAfterEachas @ShankarKoirala does to initialize and tear down Spark artifacts. I know this is a reasonable place to make an exception, but I really don't like those mutable vars you have to use though.

贷款模式

另一种方法是使用贷款模式.

例如(使用 ScalaTest):

For example (using ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
}

如您所见,贷款模式利用高阶函数将 SparkContext 借给测试,然后在测试完成后将其处理掉.

As you can see, the Loan Pattern makes use of higher-order functions to "loan" the SparkContext to the test and then to dispose of it after it's done.

面向苦难的编程(感谢 Nathan)

这完全是一个偏好问题,但我更喜欢使用贷款模式,并在引入另一个框架之前尽可能地自己连接东西.除了试图保持轻量级之外,框架有时还会添加许多魔法",使调试测试失败难以推理.所以我采用了Suffering-Oriented Programming 方法——我避免添加新的框架,直到没有它的痛苦难以承受.但同样,这取决于您.

It is totally a matter of preference, but I prefer to use the Loan Pattern and wire things up myself as long as I can before bringing in another framework. Aside from just trying to stay lightweight, frameworks sometimes add a lot of "magic" that makes debugging test failures hard to reason about. So I take a Suffering-Oriented Programming approach--where I avoid adding a new framework until the pain of not having it is too much to bear. But again, this is up to you.

该替代框架的最佳选择当然是 spark-testing-base 作为@ShankarKoirala 提到.在这种情况下,上面的测试将如下所示:

The best choice for that alternate framework is of course spark-testing-base as @ShankarKoirala mentioned. In that case, the test above would look like this:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in {
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

请注意,我无需为处理 SparkContext 做任何事情.SharedSparkContext 给了我所有这些——用 sc 作为 SparkContext——免费的.就我个人而言,我不会仅仅为了这个目的引入这种依赖关系,因为贷款模式正是我所需要的.此外,由于分布式系统会发生如此多的不可预测性,因此当持续集成中出现问题时,不得不追踪第三方库源代码中发生的神奇事情可能会非常痛苦.

Note how I didn't have to do anything to deal with the SparkContext. SharedSparkContext gave me all that--with sc as the SparkContext--for free. Personally though I wouldn't bring in this dependency for just this purpose since the Loan Pattern does exactly what I need for that. Also, with so much unpredictability that happens with distributed systems, it can be a real pain to have to trace through the magic that happens in the source code of a third-party library when things go wrong in continuous integration.

现在 spark-testing-base 真正闪耀的是基于 Hadoop 的帮助程序,例如 HDFSClusterLikeYARNClusterLike.将这些特征混合在一起可以真正为您节省很多设置的痛苦.它的另一个亮点是 Scalacheck 之类的属性和生成器——当然,假设您了解属性如何基于测试的工作及其有用的原因.但同样,在我的分析和测试达到这种复杂程度之前,我个人会推迟使用它.

Now where spark-testing-base really shines is with the Hadoop-based helpers like HDFSClusterLike and YARNClusterLike. Mixing those traits in can really save you a lot of setup pain. Another place where it shines is with the Scalacheck-like properties and generators--assuming of course you understand how property-based testing works and why it is useful. But again, I would personally hold off on using it until my analytics and my tests reach that level of sophistication.

只有西斯才能处理绝对值."-- 欧比旺·克诺比

当然,您也不必选择其中之一.也许您可以将贷款模式方法用于大多数测试,而 spark-testing-base 仅用于少数更严格的测试.选择不是二元的;两者都可以.

Of course, you don't have to choose one or the other either. Perhaps you could use the Loan Pattern approach for most of your tests and spark-testing-base only for a few, more rigorous tests. The choice isn't binary; you can do both.

使用 Spark Streaming 进行集成测试

最后,我想展示一个带有内存值的 SparkStreaming 集成测试设置的片段,如果没有 spark-testing-base:

Finally, I would just like to present a snippet of what a SparkStreaming integration test setup with in-memory values might look like without spark-testing-base:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

这比看起来简单.它实际上只是将数据序列转换成一个队列,以提供给 DStream.其中大部分实际上只是与 Spark API 一起使用的样板设置.无论如何,您可以将其与 StreamingSuiteBase 进行比较 spark-testing-base 来决定你喜欢哪个.

This is simpler than it looks. It really just turns a sequence of data into a queue to feed to the DStream. Most of it is really just boilerplate setup that works with the Spark APIs. Regardless, you can compare this with StreamingSuiteBase as found in spark-testing-base to decide which you prefer.

这可能是我有史以来最长的帖子,所以我会把它留在这里.我希望其他人能提出其他想法,通过改进所有其他应用程序开发的相同敏捷软件工程实践来帮助提高我们的分析质量.

This might be my longest post ever, so I will leave it here. I hope others chime in with other ideas to help improve the quality of our analytics with the same agile software engineering practices that have improved all other application development.

为无耻的插件道歉,你可以看看我们的课程使用 Apache Spark 进行分析,我们在其中讨论了很多这些想法以及更多内容.我们希望尽快有一个在线版本.

And with apologies for the shameless plug, you can check out our course Analytics with Apache Spark, where we address a lot of these ideas and more. We hope to have an online version soon.

这篇关于如何在 Spark 2.0+ 中编写单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-29 12:57
查看更多