spark结构化流的内存问题

spark结构化流的内存问题

本文介绍了spark结构化流的内存问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Spark 2.2.0 中使用聚合和分区运行结构化流时遇到内存问题:

session.readStream().schema(inputSchema).option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB).option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF).csv("s3://test-bucket/input").as(Encoders.bean(TestRecord.class)).flatMap(mf, Encoders.bean(TestRecord.class)).dropDuplicates("testId", "testName").withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY")).writeStream().option("path", "s3://test-bucket/output").option("checkpointLocation", "s3://test-bucket/checkpoint").trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)).partitionBy("年").format("镶木地板").outputMode(OutputMode.Append()).queryName("测试流").开始();

在测试期间,我注意到每次新数据到来时使用的内存量都会增加,最终执行程序以代码 137 退出:

ExecutorLostFailure(执行器 2 因正在运行的任务之一退出)原因:容器标记为失败:容器_1520214726510_0001_01_000003 在主机上:ip-10-0-1-153.us-west-2.compute.internal.退出状态:137.诊断:根据请求终止容器.退出代码是 137容器以非零退出代码 137 退出被外部信号杀死

我创建了一个堆转储,发现 org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider 使用的大部分内存是从

此外,GC 时间占总处理时间的 30% 以上

这是从执行器中获取的堆转储,其内存量比上面的屏幕小,因为当我从那个执行器创建转储时,java 进程刚刚在进程中间终止.

解决方案

迁移我对 SPARK-23682 这个问题的提问者也提交了问题.

在 HDFS 状态存储提供程序中,它在内存中过度缓存多个版本的状态,默认 100 个版本.该问题由 SPARK-24717 解决,并且只会维护两个版本(当前用于重播,新用于更新)内存中的状态.该补丁将在 Spark 2.4.0 中提供.

I'm facing memory issues running structured stream with aggregation and partitioning in Spark 2.2.0:

session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();

During testing I noticed that amount of used memory increases each time when new data comes and finally executors exit with code 137:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

I've created a heap dump and found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore

On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it is.

Moreover, GC time took more than 30% of total processing time

Here is a heap dump taken from the executor running with smaller amount of memory than on screens above since when I was creating a dump from that one the java process just terminated in the middle of the process.

解决方案

Migrating my comment on SPARK-23682 which asker of this question also filed in issue.

In HDFS state store provider, it excessively caches the multiple versions of states in memory, default 100 versions. The issue is addressed by SPARK-24717, and it will only maintain two versions (current for replay, and new for update) of state in memory. The patch will be available in Spark 2.4.0.

这篇关于spark结构化流的内存问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 08:57