考虑到我正在流过foreachPartitions以便对累加器变量执行一些求和的情况,与foreach方法相比,我想知道RDD是否会由于较高的并行度而导致更好的性能。

最佳答案

foreachforeachPartitions是 Action 。

foreach(函数):单位



注意:在foreach()之外修改除累加器以外的变量可能会导致不确定的行为。有关更多详细信息,请参见Understanding closures

example:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(function):单位



foreachPartition示例的用法:

  • 示例1:对于每个分区,要使用一个数据库连接(在每个分区块的内部),然后这是使用scala进行连接的示例用法。
  • /**
        * Insert in to database using foreach partition.
        *
        * @param sqlDatabaseConnectionString
        * @param sqlTableName
        */
      def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
    
        //numPartitions = number of simultaneous DB connections you can planning to give
    
    datframe.repartition(numofpartitionsyouwant)
    
        val tableHeader: String = dataFrame.columns.mkString(",")
        dataFrame.foreachPartition { partition =>
          // Note : Each partition one connection (more better way is to use connection pools)
          val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
          //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
          partition.grouped(1000).foreach {
            group =>
              val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
              group.foreach {
                record => insertString.append("('" + record.mkString(",") + "'),")
              }
    
              sqlExecutorConnection.createStatement()
                .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
                  + insertString.stripSuffix(","))
          }
    
    
          sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
        }
      }
    
    • Example2 :

    Usage of foreachPartition with sparkstreaming (dstreams) and kafka producer

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
    // only once per partition You can safely share a thread-safe Kafka //producer instance.
        val producer = createKafkaProducer()
        partitionOfRecords.foreach { message =>
          producer.send(message)
        }
        producer.close()
      }
    }
    





    test(“Foreach-Spark”){
    导入spark.implicits._
    var accum = sc.longAccumulator
    sc.parallelize(Seq(1,2,3))。foreach(x => accum.add(x))
    断言(accum.value == 6L)
    }

    test(“Foreach分区-Spark”){
    导入spark.implicits._
    var accum = sc.longAccumulator
    sc.parallelize(Seq(1,2,3))。foreachPartition(x => x.foreach(accum.add(_)))
    断言(accum.value == 6L)
    }

    结论:



    经验法则:



    另外...请参阅map vs mappartitions,它具有类似的概念,但它们是变形。

    09-04 06:11