我的代码的算法如下
步骤1 。将一个hbase实体数据获取到hBaseRDD
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
步骤2 。将hBaseRDD转换为rowPairRDD
// in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
.mapToPair(***);
dataRDD.repartition(500);
dataRDD.cache();
步骤3 。将rowPairRDD转换为schemaRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema);
schemaRDD.registerTempTable("testentity");
sqlContext.sqlContext().cacheTable("testentity");
步骤4 。使用spark sql做第一个简单的sql查询。
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
column3 = 'value1' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
步骤5 。使用spark sql做第二个简单的sql查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
步骤6。 使用spark sql做第三种简单的sql查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();
测试结果如下:
测试用例1 :
当我插入300,000条记录,hbase实体时,然后运行代码。
如果我使用hbase Api进行类似的查询,则只需要2000毫秒。显然,最后2个Spark sql查询比hbase api查询要快得多。
我相信第一个spark sql查询花费大量时间从hbase加载数据。
因此,第一个查询比最后两个查询要慢得多。我认为结果是预期的
测试用例2 :
当我插入400,000条记录时。 hbase实体,然后运行代码。
如果我使用hbase Api进行类似的查询,则只需3500毫秒。显然,这3个Spark sql查询比hbase api查询要慢得多。
并且最后两个spark sql查询也非常慢,并且性能类似于第一个查询,为什么?如何调整性能?
最佳答案
我怀疑您正在尝试缓存比分配给Spark实例更多的数据。我将尝试分解完全相同的查询的每次执行中发生的事情。
首先,Spark中的所有内容都是懒惰的。这意味着,当您调用rdd.cache()
时,除非您对RDD进行了某些操作,否则实际上没有任何 react 。
首先查询
第二/第三查询
现在,Spark将尝试缓存尽可能多的RDD。如果无法缓存整个内容,则可能会遇到一些严重的问题。如果缓存之前的步骤之一导致随机播放,则尤其如此。您可能会在每个后续查询的第一个查询中重复步骤1-3。那不是理想的。
要查看是否没有完全缓存RDD,请转到Spark Web UI(如果在本地独立模式下为
http://localhost:4040
),然后查找RDD存储/持久性信息。确保其为100%。编辑(每条评论):
我不能肯定地说为什么您用
spark.executor.memory=1G
达到了最大限制,但是我将添加一些有关缓存的更多相关信息。spark.storage.memoryFraction=0.6
或60%。因此,您实际上只得到1GB * 0.6
。 Object
元数据时会有相当大的开销。您可以change the default persistence level。 调用任何操作都将导致RDD被缓存。只是做这个
scala> rdd.cache
scala> rdd.count
现在已被缓存。
关于java - Spark SQL性能,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/27646171/