只有满怀自信的人,能在任何地方都怀有自信,沉浸在生活中,并认识自己的意志。

前言

最近公司有一个生产的小集群,专门用于运行spark作业。但是偶尔会因为nn或dn压力过大而导致作业checkpoint操作失败进而导致spark 流任务失败。本篇记录从应用层面对spark作业进行优化,进而达到优化集群的作用。

集群使用情况

有数据的目录以及使用情况如下:

对于.sparkStaging目录,不经常变动,只需要优化其大小即可。

对于 checkpoint目录,频繁性增删,从生成周期和保留策略两方面去考虑。

 .sparkStaging目录优化

对于/user/hadoop/.sparkStaging下文件,是spark任务依赖文件,可以将jar包上传到指定目录下,避免或减少了jar包的重复上传,进而减少任务的等待时间。

可以在spark的配置文件spark-defaults.conf配置如下内容:

spark.yarn.archive=hdfs://hdfscluster/user/hadoop/jars
spark.yarn.preserve.staging.files=false

参数说明

checkpoint优化

首先了解一下 checkpoint文件代表的含义。

checkpoint文件说明

  • offsets 目录 - 预先记录日志,记录每个批次中存在的偏移量。为了确保给定的批次将始终包含相同的数据,我们在进行任何处理之前将其写入此日志。因此,该日志中的第N个记录指示当前正在处理的数据,第N-1个条目指示哪些偏移已持久地提交给sink。

  • commits 目录 - 记录已完成的批次ID的日志。这用于检查批处理是否已完全处理,并且其输出已提交给接收器,因此无需再次处理。(例如)在重新启动过程中使用,以帮助识别接下来要运行的批处理。

  • metadata 文件 - 与整个查询关联的元数据,只有一个 StreamingQuery 唯一ID

  • sources目录 - 保存起始offset信息

下面从两个方面来优化checkpoint。

第一,从触发checkpoint机制方面考虑

trigger的机制

Trigger是用于指示 StreamingQuery 多久生成一次结果的策略。

Trigger有三个实现类,分别为:

  • OneTimeTrigger - A Trigger that processes only one batch of data in a streaming query then terminates the query.

  • ProcessingTime - A trigger that runs a query periodically based on the processing time. If interval is 0, the query will run as fast as possible.by default,trigger is ProcessingTime, and interval=0

  • ContinuousTrigger - A Trigger that continuously processes streaming data, asynchronously checkpointing at the specified interval.

可以为 ProcessingTime 指定一个时间 或者使用 指定时间的ContinuousTrigger ,固定生成checkpoint的周期,避免checkpoint生成过于频繁,减轻多任务下小集群的nn的压力

第二,从checkpoint保留机制考虑。

保留机制

spark.sql.streaming.minBatchesToRetain - 必须保留并使其可恢复的最小批次数,默认为 100

可以调小保留的batch的次数,比如调小到 20,这样 checkpoint 小文件数量整体可以减少到原来的 20%

checkpoint 参数验证

主要验证trigger机制保留机制

验证trigger机制

未设置trigger效果

未设置trigger前,spark structured streaming 的查询batch提交的周期截图如下:

每一个batch的query任务的提交是毫无周期规律可寻。

设置trigger代码

trigger效果

设置trigger代码后效果截图如下:

每一个batch的query任务的提交是有规律可寻的,即每隔5s提交一次代码,即trigger设置生效

注意,如果消息不能马上被消费,消息会有积压,structured streaming 目前并无与spark streaming效果等同的背压机制,为防止单批次query查询的数据源数据量过大,避免程序出现数据倾斜或者无法挽回的OutOfMemory错误,可以通过 maxOffsetsPerTrigger 参数来设置单个批次允许抓取的最大消息条数。

使用案例如下:

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "test-name")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1)
    .option("group.id", "2")
    .option("auto.offset.reset", "earliest")
    .load()

验证保留机制

默认保留机制效果

spark任务提交参数

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
/root/spark-test-1.0-SNAPSHOT.jar

如下图,offsets和commits最终最少各保留100个文件。

修改保留策略

通过修改任务提交参数来进一步修改checkpoint的保留策略。

添加 --conf spark.sql.streaming.minBatchesToRetain=2 ,完整脚本如下:

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
--conf spark.sql.streaming.minBatchesToRetain=2 \
/root/spark-test-1.0-SNAPSHOT.jar

修改后保留策略效果

修改后保留策略截图如下:

即 checkpoint的保留策略参数设置生效

总结

综上,可以通过设置 trigger 来控制每一个batch的query提交的时间间隔,可以通过设置checkpoint文件最少保留batch的大小来减少checkpoint小文件的保留个数。

参照

  1. https://github.com/apache/spark/blob/master/docs/running-on-yarn.md
  2. https://blog.csdn.net/lm709409753/article/details/85250859
  3. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
  4. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
  5. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
  6. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
  7. https://github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
04-25 05:16