import os
 import copy
 import codecs
 import operator
 import re
 from math import log
 from pyspark.sql import SQLContext,Row
 from pyspark.mllib.regression import LabeledPoint
 from pyspark import SparkContext, SparkConf
 from pyspark.sql import HiveContext
 from pyspark.mllib.linalg import SparseVector, DenseVector

 #===================================================================================================
 #===============================dataframe计算tags的特征选择=============================================

 df1=sqlContext.sql("select * from zhangb.isntalled_app_sample")

 #总体,各类别样本数量
 cnt_samp=df1.count()
 cnt_samp1=df1.filter(df1.installed==').count()
 cnt_samp0=df1.filter(df1.installed==').count()

 #df1注册为表
 df1.registerTempTable("table1")

 #拆分tags列,统计每个tag的每个类别的数量
 df2=sqlContext.sql("select c.pkg,c.installed,count(*) cnt from \
 (select b.installed,split(b.pkgs,',')[0] pkg  from \
 (select imei,installed,pkgs from table1 a \
 lateral view explode(split(a.installed_applist, ';')) t as pkgs) b \
 where length(b.pkgs)>0 ) c \
 group by c.pkg,c.installed  order by c.pkg,c.installed")

 #df2注册成表 table2
 df2.registerTempTable("table2")
 #sqlContext.cacheTable("table2")

 #计算每个tag的数量

 df3=sqlContext.sql("select h.pkg,(h.sumcnt-h.cnt1) cnt01,h.cnt1 cnt11 \
 from(select g.pkg,g.sumcnt,case when g.cnt1 is null then 0 else g.cnt1 end as cnt1 \
 from(select f.pkg,f.sumcnt,e.cnt1 \
 from(select c.pkg,sum(c.cnt) sumcnt from table2 c group by c.pkg ) f \
 left outer join \
 (select d.pkg,d.cnt cnt1 from table2 d where d.installed=1) e on f.pkg=e.pkg)g)h")

 #sqlContext.uncacheTable("table2")
 #df3.cache()

 #根据相似比选择

 df4=df3.withColumn("cnt00",cnt_samp0-df3.cnt01).withColumn("cnt10",cnt_samp1-df3.cnt11).\
 withColumn("xsb",((df3.cnt11+1.0)/(cnt_samp1+1.0))/((df3.cnt01+1.0)/(cnt_samp0+1.0)))

 ###=============*********以上通用**************************************************************

 # df41=df4.withColumn("ration1",(df4.cnt11+1.0)/(cnt_samp1+1.0)).\
 # withColumn("ration0",(df4.cnt01+1.0)/(cnt_samp0+1.0))

 #直接找显著pkg,不给看细化的了

 df5=df4.filter("xsb>=1.2 or xsb<=0.8").filter("cnt01+cnt11>=15")   #6265个

 def gaivchi(p):
     args=[int(p[3]),int(p[4]),int(p[1]),int(p[2])]
     y1,n1,y2,n2=int(p[3]),int(p[4]),int(p[1]),int(p[2])
     a00,a10,a01,a11=int(p[3]),int(p[4]),int(p[1]),int(p[2])
     chi2=((a00*a11-a10*a01)**2.0)*(a00+a10+a01+a11)/((a00+a10)*(a01+a11)*(a00+a01)*(a10+a11))
     num=2
     if len(args) % num != 0:
         print "Error len(args)"
     arg0=[args[0]+args[2],args[1]+args[3]]
     total0 = sum(arg0) + 0.0
     result0 = 0.0
     for w in arg0:
         if w == 0:
             result0 += 0
         else:
             result0 += w / total0 * log( w / total0, 2)
     result = 0.0
     total = sum(args)+0.0
     for x in xrange(len(args) / num):
         k = x * num
         total_up = 0.0 + sum(args[k:k + num])
         arg1=args[k:k + num]
         total1 = sum(arg1) + 0.0
         result1 = 0.0
         for i in arg1:
            if i == 0:
               result1 += 0
            else:
               result1 += i / total1 * log( i / total1, 2)
         result += total_up / total *(-result1)
     woe00=log(((y1+1.0)/(y1+y2+1.0))/((n1+1.0)/(n1+n2+1.0)))
     iv00=(((y1+1.0)/(y1+y2+1.0))-((n1+1.0)/(n1+n2+1.0)))*woe00
     woe01=log(((y2+1.0)/(y1+y2+1.0))/((n2+1.0)/(n1+n2+1.0)))
     iv01=(((y2+1.0)/(y1+y2+1.0))-((n2+1.0)/(n1+n2+1.0)))*woe01
     iv0=iv00+iv01
     woe10=log(((n1+1.0)/(n1+n2+1.0))/((y1+1.0)/(y1+y2+1.0)))
     woe11=log(((n2+1.0)/(n1+n2+1.0))/((y2+1.0)/(y1+y2+1.0)))
     iv1=((n1+1.0)/(n1+n2+1.0)-((y1+1.0)/(y1+y2+1.0)))*woe10 + ((n2+1.0)/(n1+n2+1.0)-((y2+1.0)/(y1+y2+1.0)))* woe11
     iv2=round((iv0+iv1)/2.0,6)
     return [p[0],p[1],p[2],p[3],p[4],p[5],chi2,-result0-result,iv2]

 df6=sqlContext.createDataFrame(df5.map(gaivchi),["pkg","cnt01","cnt11","cnt00","cnt10","xsb","chi2","gain","iv"])
 df6.registerTempTable("e")
 df7=sqlContext.sql("select t.pkg,t.xsb,t.chi2,t.gain,t.iv,rank1,rank2,rank3,rank1+rank2+rank3 rank \
 from (select pkg,xsb,chi2,gain,iv, \
 row_number()over (order by chi2 ) rank1, \
 row_number()over (order by gain ) rank2, \
 row_number()over (order by iv ) rank3 \
 from e ) t order by rank desc")

 #输入要选取的pkg个数:
 number=5000

 #输出指定个数的pkg
 df8=sc.parallelize(df7.head(number)).map(lambda p:p[0])

 #rdd文件才能这样保存
 df8.saveAsTextFile('hdfs:./installed_pkg')
=====================================================

    

#输入要选取的pkg个数:
number=10000

#输出指定个数的pkg
lt5_17=sc.parallelize(lt5_16.head(number)).map(lambda p:p[0])

#写成结果矩阵

lt6=sc.broadcast(lt5_17.collect())

def matrix(p):
   temp1=[p[0],p[1]]
   for i in lt6.value:
      if i in p[2]:
         temp1.append(1)
      else:
         temp1.append(0)
      return temp1

lt7=data3.filter(lambda p:len(p[2])>0).map(lambda p:(p[0],p[1],(re.split(";|,",p[2]))))

#输出样本矩阵
lt8=lt7.map(matrix)

colname=["gid","age"]+[x for x in lt6.value]

#添加索引保存列名

lt9=sc.parallelize(colname).zipWithIndex()

#成为数据框 注意此处的列索引是 整数型 不是字符型

lt10=sqlContext.createDataFrame(lt9,["colname","rank_gender"])

#保存为sql文件
#rdd文件才能这样保存

lt8.saveAsTextFile('hdfs://getui-bi-hadoop/user/zhangb/gender_model_matrix')

#dataframe保存为表
lt10.saveAsTable(tableName="gender_model_colname",source="parquet",mode="overwrite")

#####转稀疏矩阵
def sparse(p):
   vec=[int(x) for x in p[2:]]
   lvec=len(vec)
   #ind=vec.index(1)
   dic1={}
   for i in range(lvec):
     if vec[i]==1:
        dic1[i]=1
   return [p[0],p[1],SparseVector(lvec,dic1)]

lt11=lt8.map(sparse)

 

05-02 00:57