问题描述
我正在使用结构化火花流读取来自Kafka(每秒100.000行)的数据,并且我试图将所有数据插入HBase.
I'm reading data coming from a Kafka (100.000 line per second) using Structured Spark Streaming, and i'm trying to insert all the data in HBase.
我使用的是Cloudera Hadoop 2.6,使用的是Spark 2.3
I'm in Cloudera Hadoop 2.6 and I'm using Spark 2.3
我尝试了类似此处的操作.
eventhubs.writeStream
.foreach(new MyHBaseWriter[Row])
.option("checkpointLocation", checkpointDir)
.start()
.awaitTermination()
MyHBaseWriter看起来像这样:
MyHBaseWriter looks like this :
class AtomeHBaseWriter[RECORD] extends HBaseForeachWriter[Row] {
override def toPut(record: Row): Put = {
override val tableName: String = "hbase-table-name"
override def toPut(record: Row): Put = {
// Get Json
val data = JSON.parseFull(record.getString(0)).asInstanceOf[Some[Map[String, Object]]]
val key = data.getOrElse(Map())("key")+ ""
val val = data.getOrElse(Map())("val")+ ""
val p = new Put(Bytes.toBytes(key))
//Add columns ...
p.addColumn(Bytes.toBytes(columnFamaliyName),Bytes.toBytes(columnName), Bytes.toBytes(val))
p
}
}
HBaseForeachWriter类如下所示:
And the HBaseForeachWriter class looks like this :
trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {
val tableName: String
def pool: Option[ExecutorService] = None
def user: Option[User] = None
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
// I create HBase Connection Here
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(Variables.getTableName()))
}
override def process(record: RECORD): Unit = {
val put = toPut(record)
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def toPut(record: RECORD): Put
}
因此,我在这里逐行进行放置,即使我每个都允许20个执行程序和4个内核,也没有立即在HBase中插入数据.所以我需要做的是大量的工作,因为我在互联网上发现的所有东西都是使用RDD和Map/Reduce实现的.
So here I'm doing a put line by line, even if I allow 20 executors and 4 cores for each, I don't have the data inserted immediatly in HBase. So what I need to do is a bulk load ut I'm struggled because all what I find in the internet is to realize it with RDDs and Map/Reduce.
推荐答案
我了解的是记录向hbase的吸收速度较慢.我对您没有什么建议.
What I understand is slow rate of record ingestion in to hbase. I have few suggestions to you.
1) hbase.client.write.buffe r .
以下属性可能会对您有所帮助.
1) hbase.client.write.buffer .
the below property may help you.
hbase.client.write.buffer
说明 .BufferedMutator写缓冲区的默认大小(以字节为单位).较大的缓冲区会占用更多内存-在客户端和客户端 服务器端,因为服务器将传递的写缓冲区实例化到 处理它-但较大的缓冲区大小会减少制作的RPC的数量. 为了估计服务器端使用的内存,请评估 hbase.client.write.buffer * hbase.regionserver.handler.count
Description Default size of the BufferedMutator write buffer in bytes. A bigger buffer takes more memory — on both the client and server side since server instantiates the passed write buffer to process it — but a larger buffer size reduces the number of RPCs made. For an estimate of server-side memory-used, evaluate hbase.client.write.buffer * hbase.regionserver.handler.count
默认2097152(大约2 mb)
Default 2097152 (around 2 mb )
我更喜欢 foreachBatch
参见spark docs (在spark内核中为foreachPartition),而不是foreach
I prefer foreachBatch
see spark docs (its kind of foreachPartition in spark core) rather foreach
也在您的hbase writer中扩展了ForeachWriter
Also in your hbase writer extends ForeachWriter
open
方法初始化put的数组列表在process
中,将put添加到puts的arraylist中在close
table.put(listofputs);
中,然后在更新表后重置arraylist ...
open
method intialize array list of put in process
add the put to the arraylist of putsin close
table.put(listofputs);
and then reset the arraylist once you updated the table...
基本上它会做什么,您上面提到的缓冲区大小将填充2 mb,然后它将刷新到hbase表中.在那之前,记录将不会进入hbase表.
what it does basically your buffer size mentioned above is filled with 2 mb then it will flush in to hbase table. till then records wont go to hbase table.
您可以将其增加到10mb,以此类推.这样,将减少RPC的数量.大量数据将被刷新并将存储在hbase表中.
you can increase that to 10mb and so....In this way number of RPCs will be reduced. and huge chunk of data will be flushed and will be in hbase table.
当写缓冲区已满并且触发flushCommits
进入hbase表时.
when write buffer is filled up and a flushCommits
in to hbase table is triggered.
示例代码:在我的 answer
2)关闭WAL 您可以关闭WAL(预写日志-危险无法恢复),但如果不想这样做,它将加快写入速度.恢复数据.
2) switch off WAL you can switch off WAL(write ahead log - Danger is no recovery) but it will speed up writes... if dont want to recover the data.
如何关闭: : https://hbase.apache.org/1.1/apidocs/org/apache/hadoop/hbase/client/Put.html#setWriteToWAL(boolean)
-
基本体系结构和链接有待进一步研究:
Basic architechture and link for further study :
http://hbase.apache.org/book.html#性能写作
就像我提到的puts列表是个好方法...这是在结构化流示例之前执行的旧方法(带有puts列表的foreachPartition),如下所示:其中foreachPartition
为每个分区而不是每一行操作.
as I mentioned list of puts is good way... this is the old way (foreachPartition with list of puts) of doing before structured streaming example is like below .. where foreachPartition
operates for each partition not every row.
def writeHbase(mydataframe: DataFrame) = {
val columnFamilyName: String = "c"
mydataframe.foreachPartition(rows => {
val puts = new util.ArrayList[ Put ]
rows.foreach(row => {
val key = row.getAs[ String ]("rowKey")
val p = new Put(Bytes.toBytes(key))
val columnV = row.getAs[ Double ]("x")
val columnT = row.getAs[ Long ]("y")
p.addColumn(
Bytes.toBytes(columnFamilyName),
Bytes.toBytes("x"),
Bytes.toBytes(columnX)
)
p.addColumn(
Bytes.toBytes(columnFamilyName),
Bytes.toBytes("y"),
Bytes.toBytes(columnY)
)
puts.add(p)
})
HBaseUtil.putRows(hbaseZookeeperQuorum, hbaseTableName, puts)
})
}
我的感觉是我们需要了解spark和hbase的心理学 从而形成有效的配对.
What I feel is we need to understand the psycology of spark and hbase to make then effective pair.
这篇关于使用结构化Spark流在HBase中批量插入数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!