问题描述
在我的Spark Streaming应用程序中,我有很多I/O操作,例如codis,hbase等.我想确保每个执行器中只有一个连接池,如何才能做到这一点呢?现在,我分散地实现了一些静态类,这对管理不利.如何将它们集中到一个类中,例如xxContext,一些集中在SparkContext中,需要广播吗?我知道广播大型只读数据集会很好,但是这些连接池又如何呢?Java或scala都可以接受.
In my spark streaming app, I have many I/O operations, such as codis, hbase, etc. I want to make sure exactly one connection pool in each executor, how can I do this elegantly?Now, I implement some static class dispersedly, this is not good for management. How about centralize them into one class like xxContext, some what like SparkContext, and need I broadcast it? I know it's good to broadcast large read-only dataset, but how about these connection pools?Java or scala are both acceptable.
推荐答案
foreachPartition
最合适
示例代码段
foreachPartition
is best fit
Sample code snippet to it
val dstream = ...
dstream.foreachRDD { rdd =>
//loop through each parttion in rdd
rdd.foreachPartition { partitionOfRecords =>
//1. Create Connection object/pool for Codis, HBase
// Use it if you want record level control in rdd or partion
partitionOfRecords.foreach { record =>
// 2. Write each record to external client
}
// 3. Batch insert if connector supports from an RDD to external source
}
//Use 2 or 3 to write data as per your requirement
}
检查此内容:使用foreachRDD的设计模式
这篇关于每个JVM中的Spark Streaming连接池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!