1、窗口函数需要使用hiveContext,故引入如下包
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.4.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.4.1"
关于hiveContext,需要说明一点,使用hiveContext不是说一定要部署hive,像row_number() over(partition by 。。。。)窗口函数就不用,
另外,在spark-shell里,你看到的sqlContext其实就是HiveContext(这也就是为什么会在运行spark-shell的目录产生一个derby文件derby.log和文件夹metastore_db),
也就是说,你在spark-shell里边可以直接使用窗口函数(注意:真正写的spark app jar包,必须把hive打进去,才能在集群上运行,这点与spark-shell不同)
2、使用窗口函数,取每个mac的第一条记录
sqlContext.read.load(s"hdfs://myspark/logs").registerTempTable("logs")
sql(
s"""select *
from (select mac_address, remote_ip, event_date, country, province, city,
row_number() over(partition by mac_address order by event_date) as rn
from logs where event_date <=$event_date_int) as group_by_mac
where rn =1
""").drop("rn").registerTempTable("mac_first_result")
3、关于where
from logs where event_date <=$event_date_int //建议使用支持filterpushdown的数据格式,如,spark 默认的parquet
4、关于性能
使用窗口函数时,建议需要做cache的,就做下cache,每算一次还是挺花费时间,消耗性能的
5、其他窗口函数,大家自行摸索吧