我的代码的算法如下
步骤1 。将一个hbase实体数据获取到hBaseRDD

      JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
                 jsc.newAPIHadoopRDD(hbase_conf,  TableInputFormat.class,
                 ImmutableBytesWritable.class, Result.class);

步骤2 。将hBaseRDD转换为rowPairRDD
     // in the rowPairRDD the key is hbase's row key, The Row is the hbase's Row data
     JavaPairRDD<String, Row> rowPairRDD = hBaseRDD
                            .mapToPair(***);
    dataRDD.repartition(500);
        dataRDD.cache();

步骤3 。将rowPairRDD转换为schemaRDD
            JavaSchemaRDD schemaRDD =   sqlContext.applySchema(rowPairRDD.values(), schema);
            schemaRDD.registerTempTable("testentity");
           sqlContext.sqlContext().cacheTable("testentity");

步骤4 。使用spark sql做第一个简单的sql查询。
   JavaSQLContext  sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc);
    JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE
             column3 = 'value1' ")
     List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

步骤5 。使用spark sql做第二个简单的sql查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity
                                     WHERE column3 = 'value2' ")
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

步骤6。 使用spark sql做第三种简单的sql查询。
JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' ");
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

测试结果如下:

测试用例1 :

当我插入300,000条记录,hbase实体时,然后运行代码。
  • 第一个查询需要60407毫秒
  • 第二个查询需要838毫秒
  • 3td查询需要792毫秒

  • 如果我使用hbase Api进行类似的查询,则只需要2000毫秒。显然,最后2个Spark sql查询比hbase api查询要快得多。
    我相信第一个spark sql查询花费大量时间从hbase加载数据。
    因此,第一个查询比最后两个查询要慢得多。我认为结果是预期的

    测试用例2 :

    当我插入400,000条记录时。 hbase实体,然后运行代码。
  • 第一个查询需要87213毫秒
  • 第二个查询需要83238毫秒
  • 3td查询需要82092毫秒

  • 如果我使用hbase Api进行类似的查询,则只需3500毫秒。显然,这3个Spark sql查询比hbase api查询要慢得多。
    并且最后两个spark sql查询也非常慢,并且性能类似于第一个查询,为什么?如何调整性能?

    最佳答案

    我怀疑您正在尝试缓存比分配给Spark实例更多的数据。我将尝试分解完全相同的查询的每次执行中发生的事情。

    首先,Spark中的所有内容都是懒惰的。这意味着,当您调用rdd.cache()时,除非您对RDD进行了某些操作,否则实际上没有任何 react 。

    首先查询

  • 完整的HBase扫描(缓慢)
  • 增加分区数量(导致混洗,缓慢)
  • 数据实际上已缓存到内存中,因为Spark很懒(有点慢)
  • 在谓词(快速)上应用
  • 收集结果

  • 第二/第三查询
  • 完整的内存扫描(快速)
  • 在谓词(快速)上应用
  • 收集结果

  • 现在,Spark将尝试缓存尽可能多的RDD。如果无法缓存整个内容,则可能会遇到一些严重的问题。如果缓存之前的步骤之一导致随机播放,则尤其如此。您可能会在每个后续查询的第一个查询中重复步骤1-3。那不是理想的。

    要查看是否没有完全缓存RDD,请转到Spark Web UI(如果在本地独立模式下为http://localhost:4040),然后查找RDD存储/持久性信息。确保其为100%。

    编辑(每条评论):



    我不能肯定地说为什么您用spark.executor.memory=1G达到了最大限制,但是我将添加一些有关缓存的更多相关信息。
  • Spark仅将执行程序的堆内存的一部分分配给缓存。默认情况下,这是 spark.storage.memoryFraction=0.6 或60%。因此,您实际上只得到1GB * 0.6
  • HBase中使用的总空间可能与在Spark中进行缓存时占用的总堆空间不同。默认情况下,Spark在存储在内存中时不会序列化Java对象。因此,在存储Java Object元数据时会有相当大的开销。您可以change the default persistence level



  • 调用任何操作都将导致RDD被缓存。只是做这个
    scala> rdd.cache
    scala> rdd.count
    

    现在已被缓存。

    关于java - Spark SQL性能,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/27646171/

    10-11 22:31
    查看更多