我必须使用python开发一个Spark脚本,该脚本检查一些日志并验证用户是否在两个事件之间更改了其IP国家。我有一个IP地址范围和相关国家的csv文件保存在HDFS上,如下所示:

startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany


和一个日志csv文件:

userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login


我用Spark Dataframe加载了这两个文件,并且已经修改了包含带有lag函数的日志的文件,该函数具有lag函数,并在其中添加了previousIp列。我认为的解决方案是用关联的国家/地区替换ip和previousIp,以便进行比较,并使用dataFrame.filter(“ previousIp”!=“ ip”)。
我的问题是,在Spark中有办法做到这一点吗?就像是:

dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)

为了拥有这样的数据框:

userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy


如果没有,我该如何解决我的问题?谢谢

最佳答案

如果先将IP地址转换为数字,这实际上很容易。您可以编写自己的UDF或使用petrabarus中的代码并注册如下函数:

spark.sql("CREATE TEMPORARY FUNCTION iptolong as 'net.petrabarus.hiveudfs.IPToLong'")


然后将国家csv映射到带有数字的数据框:

>>> ipdb = spark.read.csv('ipdb.csv', header=True).select(
             expr('iptolong(startIp)').alias('ip_from'),
             expr('iptolong(endIp)').alias('ip_to'),
             'country')
>>> ipdb.show()
+---------+---------+-------+
|  ip_from|    ip_to|country|
+---------+---------+-------+
|        0|167772160|  Italy|
|167772161|335544320|England|
|335544321|503316480|Germany|
+---------+---------+-------+


另外,将您的日志数据框映射到数字:

>>> log = spark.createDataFrame([('15.0.0.1',)], ['ip']) \
            .withColumn('ip', expr('iptolong(ip)'))
>>> log.show()
+---------+
|       ip|
+---------+
|251658241|
+---------+


然后,您可以使用between条件加入此数据框:

>>> log.join(broadcast(ipdb), log.ip.between(ipdb.ip_from, ipdb.ip_to)).show()
+---------+---------+---------+-------+
|       ip|  ip_from|    ip_to|country|
+---------+---------+---------+-------+
|251658241|167772161|335544320|England|
+---------+---------+---------+-------+

关于python - 输入中带有Dataframe的Spark Udf函数,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/41577001/

10-16 01:56