我是spark的强烈爱好者,当然更是有这样的需求。最近在工程实践中搞定了这个问题,发文大家共享:
scala和java的混编主要的问题是原生对象的转换。python的加入需要利用第三方库py4j:
技术解决方案:
a) Pyton调用java原生对象及方法:
经测试jpype对于java原生类型支持不好,而py4j基本支持所有java原生类型,所以采用py4j来实现python和 jvm的交互。
b) Scala模块的运算结果转换成java原生对象:
Scala支持隐式转换,基本不需要编写专门转换的代码,就可以把scala原生类型转换成java原生类型,返回给python client。
具体实现:
JavaServer端负责提供各种业务逻辑模块的接口,接受python client的rpc请求,自动把返回的java原生对象转换为python的原生类型。测试了arraylist和hashmap等类型都能够很好地转换。
下面的代码把复杂的HashMap[String,ArrayList[String]]自动转换成了python的dict[String,list[String]]类型。 ·JVMServer启动: val gatewayServer: GatewayServer = newGatewayServer( JvmServer) gatewayServer.start ·各种复杂的scala原生对象转换成java原生类型,方便python client读取使用: import scala.collection.JavaConversions._ def joinTable(tableRDD:RDD[Array[String]]):util.HashMap[String, util.List[String]] = { val arr = new java.util.ArrayList[String]() val typedRDD = tableRDD.map(record =>(record(0),record(1))) val joinedRdd = typedRDD.join(typedRDD) val retVal = new util.HashMap[String,util.List[String]] joinedRdd.collect.map(x => retVal(x._1)= Seq(x._2._1, x._2._2)) retVal } Python Client实现(python语言): ·安装 py4j:pip install py4j ·编写client: 1.获得javaServer的连接 gateway = JavaGateway().entry_point 3.调用jvm server的readtable方法得到表数据,存入rdd,并且返回rdd的引用: tabRdd = gateway.readTable(accessId, accessKey,odpsUrl,tunnelUrl ) 4.调用jvm server的jointable方法实现一个简单的rdd的join,并返回java原生的hashmap。为了说明对复杂原生类型的支持,这个hashmap的value字段是一个arraylist。 hashMap = gateway.joinTable(tabRdd) print hashMap
注意import和原生类型的混编,下面是我编写spark项目的一段代码:
更多的对象类型转换:
scala.collection.Iterable java.lang.Iterable scala.collection.Iterable java.util.Collection scala.collection.Iterator java.util.{ Iterator, Enumeration } scala.collection.mutable.Buffer java.util.List scala.collection.mutable.Set java.util.Set scala.collection.mutable.Map java.util.{ Map, Dictionary } scala.collection.mutable.ConcurrentMap java.util.concurrent.ConcurrentMap
scala.collection.Seq => java.util.List scala.collection.mutable.Seq => java.util.List scala.collection.Set => java.util.Set scala.collection.Map => java.util.Map java.util.Properties => scala.collection.mutable.Map[String, String]
参考: