我必须在Azure的Hdinsight群集上运行一个简单的单词计数。我已经使用hadoop和spark创建了一个集群,并且我已经有了包含代码的jar文件,但我不知道如何设置集群以及在Azure上启动Spark的正确代码行的问题,我想尝试不同的节点组合(workers,2-4-8)来查看程序的扩展方式。
每次我在yarn-client模式下以spark-submit方式启动应用程序时,它都能正常工作,但始终使用2个执行器和1个核心在3分钟左右的时间内输入1gb输入文本文件,如果我设置了更多的执行器和更多的核心,他也会进行设置,他不使用它,所以我认为问题出在RDD上,它没有以正确的模式拆分输入文件,因为它仅创建了从2个工作节点开始的2个任务,而其他节点保持不 Activity 状态。
用sbt包创建的jar文件。
启动Spark的命令:
spark-submit --class "SimpleApp" --master yarn-client --num-executors 2 simpleapp_2.10-1.0.jar
字数代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import java.io._
import org.apache.hadoop.fs
import org.apache.spark.rdd.RDD
object SimpleApp {
def main(args: Array[String]){
//settingsparkcontext
val conf = new SparkConf().setAppName("SimpleApp")
val sc = new SparkContext(conf)
//settingthewordtosearch
val word = "word"
//settingtime
val now = System.nanoTime
//settingtheinputfile
val input = sc.textFile("wasb://xxx@storage.blob.core.windows.net/dizionario1gb.txt")
//wordlookup
val splittedLines = input.map(line=>line.split(""))
val find = System.nanoTime
val tot = splittedLines.map(x => x.equals(word)).count()
val w=(System.nanoTime-find)/1000000
val rw=(System.nanoTime-now)/1000000
//reportingtheresultofexecutioninatxtfile
val writer = new FileWriter("D:\\Users\\user\\Desktop\\File\\output.txt",true)
try {
writer.write("Word found "+tot+" time total "+rw+" mstimesearch "+w+" time read "+(rw-w)+"\n")
}
finally writer.close()
//terminatingthesparkserver
sc.stop()
}}
最佳答案
并行度
“除非为每个操作设置足够高的并行度,否则群集将不会得到充分利用。Spark会根据文件的大小自动设置要在每个文件上运行的“映射”任务的数量(尽管您可以通过SparkContext的可选参数进行控制) .textFile等),您可以将并行级别作为第二个参数传递(请参见spark.PairRDDFunctions文档),或设置config属性spark.default.parallelism来更改默认值。通常,我们建议每个2-3个任务集群中的CPU核心。”
来源:
https://spark.apache.org/docs/1.3.1/tuning.html