问题描述
我在结构化流中有一个Spark作业,该作业使用来自Kafka的数据并将其保存到InfluxDB。我已经实现了如下的连接池机制:
I have a Spark job in Structured Streaming that consumes data from Kafka and saves it to InfluxDB. I have implemented the connection pooling mechanism as follows:
object InfluxConnectionPool {
val queue = new LinkedBlockingQueue[InfluxDB]()
def initialize(database: String): Unit = {
while (!isConnectionPoolFull) {
queue.put(createNewConnection(database))
}
}
private def isConnectionPoolFull: Boolean = {
val MAX_POOL_SIZE = 1000
if (queue.size < MAX_POOL_SIZE)
false
else
true
}
def getConnectionFromPool: InfluxDB = {
if (queue.size > 0) {
val connection = queue.take()
connection
} else {
System.err.println("InfluxDB connection limit reached. ");
null
}
}
private def createNewConnection(database: String) = {
val influxDBUrl = "..."
val influxDB = InfluxDBFactory.connect(...)
influxDB.enableBatch(10, 100, TimeUnit.MILLISECONDS)
influxDB.setDatabase(database)
influxDB.setRetentionPolicy(database + "_rp")
influxDB
}
def returnConnectionToPool(connection: InfluxDB): Unit = {
queue.put(connection)
}
}
在我的火花工作中,我执行以下操作
In my spark job, I do the following
def run(): Unit = {
val spark = SparkSession
.builder
.appName("ETL JOB")
.master("local[4]")
.getOrCreate()
...
// This is where I create connection pool
InfluxConnectionPool.initialize("dbname")
val sdvWriter = new ForeachWriter[record] {
var influxDB:InfluxDB = _
def open(partitionId: Long, version: Long): Boolean = {
influxDB = InfluxConnectionPool.getConnectionFromPool
true
}
def process(record: record) = {
// this is where I use the connection object and save the data
MyService.saveData(influxDB, record.topic, record.value)
InfluxConnectionPool.returnConnectionToPool(influxDB)
}
def close(errorOrNull: Throwable): Unit = {
}
}
import spark.implicits._
import org.apache.spark.sql.functions._
//Read data from kafka
val kafkaStreamingDF = spark
.readStream
....
val sdvQuery = kafkaStreamingDF
.writeStream
.foreach(sdvWriter)
.start()
}
但是,当我运行作业时,出现以下异常
But, when I run the job, I get the following exception
18/05/07 00:00:43 ERROR StreamExecution: Query [id = 6af3c096-7158-40d9-9523-13a6bffccbb8, runId = 3b620d11-9b93-462b-9929-ccd2b1ae9027] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 8, 192.168.222.5, executor 1): java.lang.NullPointerException
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:332)
at com.abc.telemetry.app.influxdb.InfluxConnectionPool$.returnConnectionToPool(InfluxConnectionPool.scala:47)
at com.abc.telemetry.app.ETLappSave$$anon$1.process(ETLappSave.scala:55)
at com.abc.telemetry.app.ETLappSave$$anon$1.process(ETLappSave.scala:46)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:53)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
NPE是将连接返回到连接p时队列中的ool(连接)。我在这里想念什么?任何帮助表示赞赏。
The NPE is when the connection is returned to the connection pool in queue.put(connection). What am I missing here? Any help appreciated.
P.S:在常规DStreams方法中,我是使用foreachPartition方法实现的。不确定如何使用结构化流进行连接重用/池化。
P.S: In the regular DStreams approach, I did it with foreachPartition method. Not sure how to do connection reuse/pooling with structured streaming.
推荐答案
我同样使用forEachWriter进行redis,其中池仅在该过程中被引用。您的请求如下所示
I am using the forEachWriter for redis similarly, where the pool is being referenced in the process only. Your request would look something like below
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: record) = {
influxDB = InfluxConnectionPool.getConnectionFromPool
// this is where I use the connection object and save the data
MyService.saveData(influxDB, record.topic, record.value)
InfluxConnectionPool.returnConnectionToPool(influxDB)
}```
这篇关于Spark连接池-这是正确的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!