问题描述
我的SPARK程序中的流程如下:
驱动程序 - > Hbase连接创建 - >广播Hbase句柄
现在来自执行者,我们获取这个句柄并尝试写入hbase中。在Driver程序中,我创建HBase conf对象和Connection Object,然后通过JavaSPARK上下文广播它,如下所示:
/ p>
SparkConf sparkConf = JobConfigHelper.getSparkConfig();
配置conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
jsc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(Long.parseLong(batchDuration)));
配置hconf = HBaseConfiguration.create();
hconf.addResource(新路径(/ etc / hbase / conf / core-site.xml));
hconf.addResource(新路径(/ etc / hbase / conf / hbase-site.xml));
UserGroupInformation.setConfiguration(hconf);
JavaSparkContext js = jsc.sparkContext();
连接连接= ConnectionFactory.createConnection(hconf);
connectionbroadcast = js.broadcast(连接);
执行者的内部调用()方法,
Table table = connectionbroadcast.getValue()。getTable(TableName.valueOf(gfttsdgn:FRESHHBaseRushi));
把p = new Put(Bytes.toBytes(row1));
p.add(Bytes.toBytes(c1),Bytes.toBytes(output),Bytes.toBytes(rohan));
table.put(p);
尝试以纱线客户端模式运行时出现以下异常:
17/03/02 09:19:38错误yarn.ApplicationMaster:用户类抛出异常:com.esotericsoftware.kryo.KryoException:java.util.ConcurrentModificationException
序列化跟踪:
类(sun.misc.Launcher $ AppClassLoader)
classLoader(org.apache.hadoop.conf.Configuration)
conf(org.apache.hadoop.hbase。 client.RpcRetryingCallerFactory)
rpcCallerFactory(org.apache.hadoop.hbase.client.AsyncProcess)
asyncProcess(org.apache.hadoop.hbase.client.ConnectionManager $ HConnectionImplementation)
com.esotericsoftware。 kryo.KryoException:java.util.ConcurrentModificationException
序列化跟踪:
类(sun.misc.Launcher $ AppClassLoader)
classLoader(org.apache.hadoop.conf.Configuration)
conf(org.apache.hadoop.hbase.client.RpcRetryingCallerFactory)
rpcCallerFac tory(org.apache.hadoop.hbase.client.AsyncProcess)
asyncProcess(org.apache.hadoop.hbase.client.ConnectionManager $ HConnectionImplementation)
at com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField .write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java :501)
在com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:564)
在com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213 )
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:564)
在com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
在com.esotericsoftware.kryo.Kryo.writeOb JECT(Kryo.java:501)
在com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:564)
在com.esotericsoftware.kryo.serializers.FieldSerializer.write( FieldSerializer.java:213)在com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501
)
在com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java: 564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
在org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194)
at org.apache.spark.broadcast.TorrentBroadcast $ .blockifyObject(TorrentBroadcast.scala:203)
at org。 apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast。< init>(TorrentBr oadcast.scala:85)
。在org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
。在org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala: 63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1337)
at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:639)
at com.citi.fresh.core.driver.FreshDriver.main(FreshDriver.java:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl。调用(NativeMethodAccessorImpl.java:57)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:606)
在org.apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 2.run(ApplicationMaster.scala:542)
导致:java.util.ConcurrentModificationException
java.util.Vector $ Itr.checkForComodification(Vector.java:1156)$ java.util.Vector
$ Itr.next(Vector.java:1133)
at com.esotericsoftware.kryo.serializers .CollectionSerializer.write(CollectionSerializer.java:67)
在com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
在com.esotericsoftware.kryo.Kryo.writeObject(KRYO .java:501)
at com.esotericsoftware.kryo.serializers.FieldSerializer $ ObjectField.write(FieldSerializer.java:564)
... 28 more
解决方案我可以看到您正试图使用Spark将数据批量放入HBase。正如@jojo_Berlin解释的,你的Hbase Conf不是线程安全的。但是,您可以通过使用。
配置conf = HBaseConfiguration.create();
conf.addResource(新路径(/ etc / hbase / conf / core-site.xml));
conf.addResource(新路径(/ etc / hbase / conf / hbase-site.xml));
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc,conf);
hbaseContext.bulkPut(rdd,TableName.valueOf(gfttsdgn:FRESHHBaseRushi),new PutFunction(),true);
其中'put'函数是:
public static class PutFunction implements Function< String,Put> {
public put call(String v)throws Exception {
Put put = new Put(Bytes.toBytes(v));
put.add(Bytes.toBytes(c1),Bytes.toBytes(output),
Bytes.toBytes(rohan));
回报投入;
}
}
The flow in my SPARK program is as follows:
Driver --> Hbase connection created --> Broadcast the Hbase handleNow from executors , we fetch this handle and trying to write into hbase
In Driver program, I'm creating HBase conf object and Connection Object and then broadcasting it through JavaSPARK Context as follows:
SparkConf sparkConf = JobConfigHelper.getSparkConfig(); Configuration conf = new Configuration(); UserGroupInformation.setConfiguration(conf); jsc = new JavaStreamingContext(sparkConf, Durations.milliseconds(Long.parseLong(batchDuration))); Configuration hconf=HBaseConfiguration.create(); hconf.addResource(new Path("/etc/hbase/conf/core-site.xml")); hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); UserGroupInformation.setConfiguration(hconf); JavaSparkContext js = jsc.sparkContext(); Connection connection = ConnectionFactory.createConnection(hconf); connectionbroadcast=js.broadcast(connection);
Inside call() method of the executor,
Table table = connectionbroadcast.getValue().getTable(TableName.valueOf("gfttsdgn:FRESHHBaseRushi")) ; Put p = new Put(Bytes.toBytes("row1")); p.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), Bytes.toBytes("rohan")); table.put(p);
Getting following exception when trying to run in yarn-client mode:
17/03/02 09:19:38 ERROR yarn.ApplicationMaster: User class threw exception: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classLoader (org.apache.hadoop.conf.Configuration) conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory) rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess) asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation) com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classLoader (org.apache.hadoop.conf.Configuration) conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory) rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess) asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1337) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:639) at com.citi.fresh.core.driver.FreshDriver.main(FreshDriver.java:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) Caused by: java.util.ConcurrentModificationException at java.util.Vector$Itr.checkForComodification(Vector.java:1156) at java.util.Vector$Itr.next(Vector.java:1133) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) ... 28 more
解决方案I can see that you are trying to bulk put data into HBase using Spark. As @jojo_Berlin explained, your Hbase Conf is not thread safe. However, you can easily achieve this by using SparkOnHbase.
Configuration conf = HBaseConfiguration.create(); conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.bulkPut(rdd, TableName.valueOf("gfttsdgn:FRESHHBaseRushi"), new PutFunction(), true);
Where your 'put' function is:
public static class PutFunction implements Function<String, Put> { public Put call(String v) throws Exception { Put put = new Put(Bytes.toBytes(v)); put.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), Bytes.toBytes("rohan")); return put; } }
这篇关于SPARK写给HBase写作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!