本文介绍了尝试通过Java SDK将记录从Spark DataFrame写入Dynamodb时,任务无法序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是代码段:

val client = AmazonDynamoDBClientBuilder.standard.withRegion(Regions.the_region).withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("access_key", "secret_key"))).build()
val dynamoDB = new DynamoDB(client)
val table = dynamoDB.getTable("tbl_name")

def putItem(email: String, name: String): Unit = {
    val item = new Item().withPrimaryKey("email", email).withNumber("ts", System.currentTimeMillis).withString("name", name)
    table.putItem(item)
}

spark.sql("""
select
    email,
    name
from db.hive_table_name
""").rdd.repartition(40).map(row => putItem(row.getString(0), row.getString(1))).collect()

我打算通过AWS提供的Java SDK将每条记录写入Dynamodb表,但是它抱怨如下错误:

I intend to write every record to Dynamodb table via Java SDK provided by AWS, but it complains error as below:

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)

我如何调整代码以便在每个分区上创建 DynamoDB Table 对象,以便利用Spark作业的并行性.谢谢!

How could I adjust the code in order to create the DynamoDB and Table object per partition in order to take the advantage of parallelism of Spark job. Thanks!

推荐答案

而不是 map collect ,我会使用 foreachPartition :

spark.sql(query).rdd.repartition(40).foreachPartition(iter => {

  val client = AmazonDynamoDBClientBuilder.standard.withRegion(Regions.the_region)
    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("access_key", "secret_key"))).build()
  val dynamoDB = new DynamoDB(client)
  val table = dynamoDB.getTable("tbl_name")


  iter.foreach(row => putItem(row.getString(0), row.getString(1)))
})

这篇关于尝试通过Java SDK将记录从Spark DataFrame写入Dynamodb时,任务无法序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-22 08:36