配置文件的加载是一个难点,在local模式下非常容易,但是submit后一直报找不到文件,后来采用将properties文件放在加载类同一个package下,打包到同一个jar中解决。

Streaming从Spark2X迁移到Spark1.5 summary-LMLPHPStreaming从Spark2X迁移到Spark1.5 summary-LMLPHP

import java.io.{BufferedInputStream , InputStream}
import java.util.Properties /**
* Created by wulei on 2018/4/4.
* Description: 参数初始化公共类
*/
object InitPropertiesUtil extends Serializable {
/**
* get kafka's properties
* @return java.util.Properties
*/
def initKafkaPro: Properties = {
val prop: Properties = new Properties
val in: InputStream = getClass.getResourceAsStream("conf/kafka.properties")
if (in == null){
println("ERROR : kafka.properties init failed in is null")
}
prop.load(new BufferedInputStream(in))
prop
} /**
* get redis's properties
* @return java.util.Properties
*/
def initRedisPro: Properties = {
val prop: Properties = new Properties
val in: InputStream = getClass.getResourceAsStream("conf/redis.properties")
prop.load(new BufferedInputStream(in))
prop
}
}

问题: ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory

java.lang.RuntimeException: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:41)
at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:40)
at nl.work.kafkastreamconsumer.phoenix.LinePersister$1.call(LinePersister.java:32)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: ERROR 103 (08004): Unable to establish connection.
at org.apache.phoenix.exception.SQLExceptionCode$Factory$1.newException(SQLExceptionCode.java:362)
at org.apache.phoenix.exception.SQLExceptionInfo.buildException(SQLExceptionInfo.java:133)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:282)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.access$300(ConnectionQueryServicesImpl.java:166)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1831)
at org.apache.phoenix.query.ConnectionQueryServicesImpl$11.call(ConnectionQueryServicesImpl.java:1810)
at org.apache.phoenix.util.PhoenixContextExecutor.call(PhoenixContextExecutor.java:77)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.init(ConnectionQueryServicesImpl.java:1810)
at org.apache.phoenix.jdbc.PhoenixDriver.getConnectionQueryServices(PhoenixDriver.java:162)
at org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.connect(PhoenixEmbeddedDriver.java:126)
at org.apache.phoenix.jdbc.PhoenixDriver.connect(PhoenixDriver.java:133)
at java.sql.DriverManager.getConnection(DriverManager.java:571)
at java.sql.DriverManager.getConnection(DriverManager.java:233)
at nl.work.kafkastreamconsumer.phoenix.PhoenixConnection.<init>(PhoenixConnection.java:39)
... 25 more
Caused by: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:457)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:350)
at org.apache.phoenix.query.HConnectionFactory$HConnectionFactoryImpl.createConnection(HConnectionFactory.java:47)
at org.apache.phoenix.query.ConnectionQueryServicesImpl.openConnection(ConnectionQueryServicesImpl.java:280)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor8.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.hbase.client.HConnectionManager.createConnection(HConnectionManager.java:455)
... 39 more
Caused by: java.lang.UnsupportedOperationException: Unable to find org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:36)
at org.apache.hadoop.hbase.ipc.RpcControllerFactory.instantiate(RpcControllerFactory.java:56)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:769)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:689)
... 43 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.hadoop.hbase.util.ReflectionUtils.instantiateWithCustomCtor(ReflectionUtils.java:32)
... 46 more

追踪ClientRpcControllerFactory这个类,将会发现是phoenix-core-4.4.0-HBase-1.0.jar中的,根据报错信息,可以知道也确实HBase的连接出现问题,于是我第一步去找是否将phoenix-core-4.4.0-HBase-1.0.jar这个包拷贝到Spark Client下,有的,

第二,我去找自己的spark-submit jars命令中是否对该包的引用有误,比如路径、逗号符号等,错误依然不在此。嗯! 放弃思考,已超出自己的理解范围了,马上在stackoverflow上找到这个原生Bug

I had have same problem. The reason of problem,Phoenix use a custom rpc controller factory which is a Phoenix-specific one to configure the priorities for index and system catalog table in cluster side. It is called ClientRpcControllerFactory.

In sometimes Phoenix-enabled clusters are used from pure-HBase client applications resulting in ClassNotFoundExceptions in application code or MapReduce jobs. Since hbase configuration is shared between Phoenix-clients and HBase clients, having different configurations at the client side is hard. That's why you get this exception. This problem is fixed by HBASE-14960. If you hbase version older than 2.0.0, 1.2.0, 1.3.0, 0.98.17 You can define your client side rpc controller with this setting in hbase-site.xml:

  <property>
<name>hbase.rpc.controllerfactory.class</name>
<value>org.apache.hadoop.hbase.ipc.RpcControllerFactory</value>
</property>
05-04 11:57