问题描述
我对 Spark 还是比较陌生,但我已经能够创建 Spark 应用程序我需要能够使用 JDBC 驱动程序从我们的 SQL Server 重新处理数据(我们正在删除昂贵的 SP),该应用程序加载了几个表从 Sql Server 通过 JDBC 到数据帧,然后我做了一些连接、一个组和一个过滤器,最后通过 JDBC 将一些数据重新插入到不同的表中.所有这些在 Amazon Web Services 中的 Spark EMR 中执行得很好,在 m3.xlarge 中有 2 个内核,大约一分钟.
I'm still fairly new to Spark but I have been able to create the Spark App I need to be able to reprocess data from our SQL Server using JDBC drivers ( we are removing expensive SPs ), the app loads a few tables from Sql Server via JDBC into dataframes, then I do a few joins, a group, and a filter finally reinserting some data back via JDBC the results to a different table. All this executes just fine at Spark EMR in Amazon Web Services in a m3.xlarge with 2 cores in around a minute.
我的问题如下:1. 现在我在集群上有 1 个主节点和 2 个内核,但是每次我启动一个新步骤时,从我从历史服务器中看到的情况来看,似乎只使用了 1 个执行程序,因为我可以看到列出了 2 个执行程序,完全没有使用的驱动程序,一个 id 为 1 的执行程序,处理大约 1410 个任务.而且我完全不确定如何进行.
My question is the following: 1. right now I have 1 master and 2 cores on the cluster, but every time I launch a new step, It seems from what I can see from the history server, only 1 executor is being used as I can see 2 executors listed, driver with no usage at all, an an executor with id 1 processing around 1410 tasks. And I'm completely unsure on how to proceed.
这也是 AWS 特有的,但我不想发布 2 个问题,因为它们有某种关联,有什么办法可以同时运行 2 个步骤?这意味着能够同时运行这个进程的 2 个 spark-submits,因为我们每天多次运行这个进程(它处理客户端数据).我知道我可以通过该步骤启动一个新集群,但我希望能够快速进行处理,而只是启动一个新集群需要很长时间.谢谢!!!
Also this is specific to AWS but I didn't want to post 2 questions as they are somehow related, is there any way I can run 2 steps at the same time? meaning to be able to have 2 spark-submits of this process running at the same time, as we run this process many many times a day ( it processes client data ). I know I can launch a new cluster with the step, but i want to be able to do the processing fast and just launching a new cluster takes too long.Thanks!!!
推荐答案
对于您的第一个问题:
我不确定是否是这种情况,但类似的事情发生在我们身上,也许它会有所帮助.
I am not sure if this is the case, but something similar happened to us and maybe it can help.
如果您使用 sqlContext.read.format("jdbc").load()
(或类似方法)从 JDBC 源中读取数据,默认情况下不会对结果数据帧进行分区.因此,如果您是这种情况,则在不首先对其进行分区的情况下在结果数据帧中应用转换将导致只有一个执行程序能够处理它.如果不是您的情况,以下解决方案可能无法解决您的问题.
If you are reading from the JDBC source using sqlContext.read.format("jdbc").load()
(or similar), by default the resulting dataframe is not partitioned. So, if it's the case for you, applying transformations in the resulting dataframe without partitioning it first would result in only one executor being able to process it. If it's not your case, the following solution will probably not solve your problem.
因此,我们的解决方案是在数据中创建一个数值从 1 到 32(我们期望的分区数)的数值列,并通过设置 jdbc 读取器的分区选项将其用作分区列(请检查 此链接):
So, our solution was to create a numeric column with values values from 1 to 32 (our desired number of partitions) in the data and use it as partitioning column by setting the partitioning options of the jdbc reader (please check this link):
val connectionOptions = Map[String, String] (... <connection options> ...)
val options = connectionOptions ++ Map[String, String] (
"partitionColumn" -> "column name",
"lowerBound" -> "1",
"upperBound" -> "32",
"numPartitions" -> "32"
)
val df = sqlContext.read.format("jdbc").options(options).load()
因此,通过这种方法,不仅可以并行处理读取任务(真正提高性能并避免 OOM 错误),而且对生成的数据帧进行分区并并行处理所有后续转换.
So, with this approach, not only was the reading task able to be processed in parallel (really improving the performance and avoiding OOM errors), but the resulting dataframe was partitioned and processed in parallel for all subsequent transformations.
希望能帮到你.
这篇关于为什么我的 Spark 应用程序只在 1 个执行程序中运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!