问题描述
我们试图在Spark shell中执行一个简单的Scala代码来从Hbase中检索数据。
Hadoop环境启用了Kerberos,我们确保执行kinit。
调用Spark Shell的步骤:
MASTER = yarn-client
DRIVER_CLASSPATH =/ opt / cloudera / parcels / CDH / lib / hbase / lib / *
DRIVER_LIBRARY_PATH =/ opt / cloudera / parcels / CDH / lib / hadoop / lib / native
spark-shell --driver-class-path$ DRIVER_CLASSPATH--driver-library-path $ DRIVER_LIBRARY_PATH - 驱动程序 - 内存10G - 执行程序 - 内存15G - 执行程序 - 核心8 - 执行程序3 - 主程序$ MASTER
代码:
import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org。 apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._
val hc = HBaseConfiguration。创建
hc.addResource(new Path(file:/// opt / cloudera / parcels / CDH / lib / hbase / conf / hba se-site.xml))
hc.addResource(new Path(file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml) )
hc.set(TableInputFormat.INPUT_TABLE,poc-customers)
val rdd = sc.newAPIHadoopRDD(hc,classOf [TableInputFormat],classOf [ImmutableBytesWritable],classOf [Result] )
rdd.count
以下是错误
org.apache.hadoop.hbase.client.RetriesExhaustedException:无法在org.apache.hadoop.hbase处获取位置
p
.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
at org.apache.hadoop.hbase .client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
在org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
。在组织.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
at org .apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
at org.apache.hadoop.hbase.client.ClientScanner。< init>(ClientScanner.java:135)
在org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
在org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
。在org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
在org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
。在org.apache.hadoop.hbase.mapreduce.TableInputFormatBase $ 1.initialize(TableInputFormatBase.java:200)$ b $在org.apache.spark.rdd.NewHadoopRDD b $$匿名$ 1< INIT>(NewHadoopRDD.sca la:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache .spark.executor.Executor $ TaskRunner.run(Executor.scala:203)$ b $ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)$ b $在java.lang.Thread.run(Thread.java:745)
导致:java.io.IOException:无法将IO Streams设置为< management-node-server-hostname> /10.118.114.40:60020
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setup IOstreams(RpcClientImpl.java:773)
位于org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.writeRequest(RpcClientImpl.java:881)
位于org.apache.hadoop.hbase.ipc。 RpcClientImpl $ Connection.tracedWriteRequest(RpcClientImpl.java:850)
处org.apache.hadoop.hbase org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
。 ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)维持在org.apache.hadoop org.apache.hadoop.hbase.ipc.AbstractRpcClient $ BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
。 hbase.protobuf.generated.ClientProtos $ ClientService $ BlockingStub.get(ClientProtos.java:31865)在org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
。在org.apache.hadoop.hbase.client.ConnectionManager $ HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
at org.apache.hadoop.hbase.c lient.ConnectionManager $ HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
在org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
... 23个
导致:java.lang.RuntimeException:SASL认证失败。最可能的原因是丢失或无效的凭证。考虑'kinit'。
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection $ 1.run(RpcClientImpl.java:673)$ b $ at java.security.AccessController.doPrivileged(Native Method)
at javax .security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.hbase .ipc.RpcClientImpl $ Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
在org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setupIOstreams(RpcClientImpl.java:739)
... 33更多
导致:javax.security.sasl.SaslException:GSS启动失败[由GSSException引起:未提供有效凭证(机制级别:无法找到任何Kerberos tgt)]
at com.sun.security .sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
在org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
。在org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setupSaslConnection(RpcClientImpl.java:605)
。在org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.access $ 600(RpcClientImpl.java:154)
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection $ 2.run(RpcClientImpl.java:731)
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection $ 2.run(
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org。 apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setupIOstreams(RpcClientImpl.java:728)
... 33 more
由于:GSSException:没有提供有效的凭证(机制级别:无法找到任何Kerberos tgt)
at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
A吨sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
。在sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
。在sun.security .jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
在sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
在sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl .java:179)
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
... 42 more
请注意:
$ b
- 我们可以从同一个会话中调用Hbase shell并扫描同一个表中的记录。
- 我们可以从同一个Spark Shell会话执行HDFS文件的字数统计。
>
- 我们能够以本地模式执行上述代码
- 我们可以执行其他操作相同的火花外壳会话 -
a。 val admin = new HBaseAdmin(hc)
b。 print(admin.isTableAvailable(poc-customers))
寻求帮助解决此问题。
$ b当Spark驱动程序请求YARN在集群某处产生其执行者时,它使用它的本地 Kerberos TGT - 您用kinit
创建的人员 - 进行身份验证。然后,YARN发布全局委托令牌,该令牌由所有执行者共享以访问HDFS和YARN。
唉,HBase不支持该委托令牌。每个执行者必须重新认证到ZK,然后到本地 TGT。
在完美的世界中,你只需要在spark-default.conf中插入两个属性,即
spark.yarn.principal
和spark.yarn .keytab
(创建一个keytab来存储你的密码是你用ktutil工具做的)
唉该功能是为需要更新其HDFS委托令牌(通常每7天)的长时间运行的Streaming作业构建的,不适用于HBase初始认证。现在,针对Spark 1.6的发行说明显示了很多与YARN和Kerberos相关的错误修复,也许该功能现在可以为HBase开箱即用。但是我不会打赌的。
那么解决方法是什么?
- 在驱动程序运行的Java代码中,声明keytab文件必须通过
addFile()
- 在由Executors运行的Java代码中,在连接到HBase之前,显式创建一个Hadoop
UserGroupInformation
,显式地从keytab中获取自己的Kerberos TGT
请注意,当以这种方式使用时,UGI保持私有TGT - 它不会显示在缓存中,以便其他进程在同一台机器上不能重用它(另一方面,另一个进程中的
kinit
不会篡改它)。We are trying to execute a simple Scala code in Spark shell to retrieve data from Hbase. The Hadoop environment is Kerberos enabled and we have ensured to execute kinit.
Steps to invoke Spark Shell:
MASTER=yarn-client DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*" DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native" spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER
Code:
import org.apache.hadoop.fs._ import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io._ import org.apache.hadoop.hbase.mapreduce._ import org.apache.hadoop.hbase.util._ import org.apache.spark._ val hc = HBaseConfiguration.create hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml")) hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml")) hc.set(TableInputFormat.INPUT_TABLE, "poc-customers") val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) rdd.count
Following is the ERROR below
org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140) at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888) at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90) at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167) at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134) at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020 at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850) at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184) at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216) at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126) at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299) ... 23 more Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'. at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739) ... 33 more Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728) ... 33 more Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt) at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147) at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121) at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187) at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212) at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193) ... 42 more
Please note:
- We are able to invoke Hbase shell from the same session and scan records from the same table
- We are able to execute a word count for an HDFS file from the same Spark Shell session
- We are able to execute the above code in local mode
- We are able to perform other operations from the same spark-shell session like –a. val admin = new HBaseAdmin(hc)b. print(admin.isTableAvailable("poc-customers"))
Looking for help to resolve this issue.
解决方案When the Spark "driver" requests YARN to spawn its "executors" somewhere in the cluster, it uses its local Kerberos TGT -- the one you created with
kinit
-- to authenticate. Then YARN issues a global delegation token that is shared by all executors to access HDFS and YARN.Alas, HBase does not support that delegation token. Each executor must re-authenticate to ZK, then to the actual HBase RegionServer, with a local TGT.
In a perfect world, you would just need to insert two properties in "spark-default.conf" i.e.
spark.yarn.principal
andspark.yarn.keytab
(creating a keytab to store your password is sthg you do with "ktutil" utility)Alas, that feature was built for long-running Streaming jobs that need to renew their HDFS delegation token (every 7 days, typically), not for HBase initial authentication. Now, the Release Notes for Spark 1.6 show a lot of bug fixes related to YARN and Kerberos, maybe the feature now works out-of-the-box for HBase also. But I wouldn't bet on it.
So what is the workaround?
- In the Java code run by the driver, state that the keytab file must be shipped to each executor with an
addFile()
- In the Java code run by the Executors, explicitly create a Hadoop
UserGroupInformation
that explicitly gets its own Kerberos TGT from the keytab, before connecting to HBaseNote that when used that way, the UGI keeps its TGT private -- it does not show in the cache, so that other processes on the same machine cannot reuse it (and on the other hand a
kinit
from another process will not tamper it).这篇关于问题:Spark shell中的Scala代码从Hbase检索数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!