本文介绍了在pre-排序输入星火特征矢量变换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HDFS上,看起来像这样的制表符分隔的文件中的一些数据:

 标签| USER_ID |特征
------------------------------
  POS机| 111 | www.abc.com
  POS机| 111 | www.xyz.com
  POS机| 111 |火狐
  POS机| 222 | www.example.com
  POS机| 222 | www.xyz.com
  POS机| 222 | IE浏览器
  NEG | 333 | www.jkl.com
  NEG | 333 | www.xyz.com
  NEG | 333 |铬

我需要改造它来创建一个特征向量每个USER_ID培养了 org.apache.spark.ml.classification.NaiveBayes 模式。

我目前的做法是主要是:


  1. 加载原始数据转换成一个数据帧

  2. 指数与StringIndexer功能

  3. 再往RDD和组由user_ID的和特征指数映射到一个稀疏的矢量。

起脚这是......在数据已经美元的USER_ID 点$ P排序。什么是利用这一点的最好方法?它的痛苦,我想大概是多少不必要的工作可能发生。

在情况稍微code是有助于了解我目前的做法,这里是图的本质是:

  VAL featurization =(瓦尔斯:(字符串,可迭代[行]))=> {
  //创建所有的特征指标的序列
  //注:索引是在一个previous一步做了不显示
  VAL SEQ = vals._2.map(X =>(x.getDouble(1).toInt,1.0D))。toSeq  //创建稀疏矢量
  VAL featureVector = Vectors.sparse(maxIndex,SEQ)  //串标签转换成一个双
  VAL标签= IF(vals._2.head.getString(2)==POS)1.0 0.0其他  (标签,vals​​._1,featureVector)
}d.rdd
  .groupBy(_。的getString(1))
  .MAP(featurization)
  .toDF(标签,USER_ID,特色)


解决方案

让我们开始与

It depends. If operation you apply can benefit from map-side aggregation then you can gain quite a lot by having presorted data without any further intervention in your code. Data sharing the same key should located on the same partitions and can be aggregated locally before shuffle.

Unfortunately it won't help much in this particular scenario. Even if you enable map side aggregation (groupBy(Key) doesn't use is so you'll need custom implementation) or aggregate over feature vectors (you'll find some examples in my answer to SPARK DataFrame: custom aggregation function to sum a column of Vectors) there is not much to gain. You can save some work here and there but you still have to transfer all indices between nodes.

If you want to gain more you'll have to do a little bit more work. I can see two basic ways you can leverage existing order:

  1. Use custom Hadoop input format to yield only complete records (label, id, all features) instead of reading data line by line. If your data has fixed number of lines per id you could even try to use NLineInputFormat and apply mapPartitions to aggregate records afterwards.

    This is definitely more verbose solution but requires no additional shuffling in Spark.

  2. Read data as usual but use custom partitioner for groupBy. As far as I can tell using rangePartitioner should work just fine but to be sure you can try following procedure:

    • use mapPartitionsWithIndex to find minimum / maximum id per partition.
    • create partitioner which keeps minimum <= ids < maximum on the current (i-th) partition and pushes maximum to the partition i + 1
    • use this partitioner for groupBy(Key)

    It is probably more friendly solution but requires at least some shuffling. If expected number of records to move is low (<< #records-per-partition) you can even handle this without shuffle using mapPartitions and broadcast* although having partitioned can be more useful and cheaper to get in practice.


* You can use an approach similar to this: http://stackoverflow.com/a/33072089/1560062

这篇关于在pre-排序输入星火特征矢量变换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 09:44