考虑到我正在流过foreachPartitions
以便对累加器变量执行一些求和的情况,与foreach
方法相比,我想知道RDD
是否会由于较高的并行度而导致更好的性能。
最佳答案
foreach
和foreachPartitions
是 Action 。
foreach(函数):单位
注意:在foreach()
之外修改除累加器以外的变量可能会导致不确定的行为。有关更多详细信息,请参见Understanding closures。
example:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
foreachPartition(function):单位
foreachPartition
示例的用法:/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
- Example2 :
Usage of foreachPartition
with sparkstreaming (dstreams) and kafka producer
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
test(“Foreach-Spark”){
导入spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3))。foreach(x => accum.add(x))
断言(accum.value == 6L)
}
test(“Foreach分区-Spark”){
导入spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3))。foreachPartition(x => x.foreach(accum.add(_)))
断言(accum.value == 6L)
}
结论:
经验法则:
另外...请参阅map vs mappartitions,它具有类似的概念,但它们是变形。