问题描述
我有一个关于连接日志的DataFrame,其中包含 Id
, targetIP
, Time
列.此DataFrame中的每个记录都是到一个系统的连接事件.Id表示此连接, targetIP
表示这次的目标IP地址,Time是连接时间.带有值:
I have a DataFrame about connection log with columns Id
, targetIP
, Time
. Every record in this DataFrame is a connection event to one system. Id means this connection, targetIP
means the target IP address this time, Time is the connection time. With Values:
1 | 1 | 192.163.0.1 |
2 | 2 | 192.163.0.2 |
3 | 3 | 192.163.0.1 |
4 | 5 | 192.163.0.1 |
5 | 6 | 192.163.0.2 |
6 | 7 | 192.163.0.2 |
7 | 8 | 192.163.0.2 |
我想在某种情况下创建一个新列:过去2个时间单位中与该时间目标IP地址的连接数.因此,结果DataFrame应该是:
I want to create a new column under some condition: count of connections to this time's target IP address in the past 2 time units. So the result DataFrame should be:
1 | 1 | 192.163.0.1 | 0 |
2 | 2 | 192.163.0.2 | 0 |
3 | 3 | 192.163.0.1 | 1 |
4 | 5 | 192.163.0.1 | 1 |
5 | 6 | 192.163.0.2 | 0 |
6 | 7 | 192.163.0.2 | 1 |
7 | 8 | 192.163.0.2 | 2 |
例如, ID = 7
, targetIP
是 192.163.0.2
在过去2个时间单位(> ID = 5
和 ID = 6
,以及它们的 targetIP
也是 192.163.0.2
.因此,有关 ID = 7
的计数为2.
For example, ID=7
, the targetIP
is 192.163.0.2
Connected to system in past 2 time units, which are ID=5
and ID=6
, and their targetIP
are also 192.163.0.2
. So the count about ID=7
is 2.
期待您的帮助.
推荐答案
您可以在范围介于-2和当前行之间的Window上使用 count
来获取最后2个IP的数量时间单位.
You can use count
over Window bounded with range between - 2 and current row, to get the count of IP in the last 2 time units.
使用Spark SQL,您可以执行以下操作:
Using Spark SQL you can do something like this:
df.createOrReplaceTempView("connection_logs")
df1 = spark.sql("""
SELECT *,
COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time
RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
) -1 AS count
FROM connection_logs
ORDER BY ID
""")
df1.show()
#+---+----+-----------+-----+
#| ID|Time| targetIP|count|
#+---+----+-----------+-----+
#| 1| 1|192.163.0.1| 0|
#| 2| 2|192.163.0.2| 0|
#| 3| 3|192.163.0.1| 1|
#| 4| 5|192.163.0.1| 1|
#| 5| 6|192.163.0.2| 0|
#| 6| 7|192.163.0.2| 1|
#| 7| 8|192.163.0.2| 2|
#+---+----+-----------+-----+
或使用DataFrame API:
Or using DataFrame API:
from pyspark.sql import Window
from pyspark.sql import functions as F
time_unit = lambda x: x
w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)
df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")
df1.show()
这篇关于在某些情况下如何在Spark DataFrame中创建新列"count"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!