本文介绍了尝试通过Java SDK将记录从Spark DataFrame写入Dynamodb时,任务无法序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
这是代码段:
val client = AmazonDynamoDBClientBuilder.standard.withRegion(Regions.the_region).withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("access_key", "secret_key"))).build()
val dynamoDB = new DynamoDB(client)
val table = dynamoDB.getTable("tbl_name")
def putItem(email: String, name: String): Unit = {
val item = new Item().withPrimaryKey("email", email).withNumber("ts", System.currentTimeMillis).withString("name", name)
table.putItem(item)
}
spark.sql("""
select
email,
name
from db.hive_table_name
""").rdd.repartition(40).map(row => putItem(row.getString(0), row.getString(1))).collect()
我打算通过AWS提供的Java SDK将每条记录写入Dynamodb表,但是它抱怨如下错误:
I intend to write every record to Dynamodb table via Java SDK provided by AWS, but it complains error as below:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
我如何调整代码以便在每个分区上创建 DynamoDB
和 Table
对象,以便利用Spark作业的并行性.谢谢!
How could I adjust the code in order to create the DynamoDB
and Table
object per partition in order to take the advantage of parallelism of Spark job. Thanks!
推荐答案
而不是 map
和 collect
,我会使用 foreachPartition
:
spark.sql(query).rdd.repartition(40).foreachPartition(iter => {
val client = AmazonDynamoDBClientBuilder.standard.withRegion(Regions.the_region)
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("access_key", "secret_key"))).build()
val dynamoDB = new DynamoDB(client)
val table = dynamoDB.getTable("tbl_name")
iter.foreach(row => putItem(row.getString(0), row.getString(1)))
})
这篇关于尝试通过Java SDK将记录从Spark DataFrame写入Dynamodb时,任务无法序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!