问题描述
我有一个关于连接日志的 DataFrame,其中列 Id
、targetIP
、Time
.此 DataFrame 中的每条记录都是与一个系统的连接事件.Id 表示本次连接,targetIP
表示本次目标IP 地址,Time 表示连接时间.有价值观:
I have a DataFrame about connection log with columns Id
, targetIP
, Time
. Every record in this DataFrame is a connection event to one system. Id means this connection, targetIP
means the target IP address this time, Time is the connection time. With Values:
ID | 时间 | targetIP |
---|---|---|
1 | 1 | 192.163.0.1 |
2 | 2 | 192.163.0.2 |
3 | 3 | 192.163.0.1 |
4 | 5 | 192.163.0.1 |
5 | 6 | 192.163.0.2 |
6 | 7 | 192.163.0.2 |
7 | 8 | 192.163.0.2 |
我想在某些条件下创建一个新列:过去 2 个时间单位内与这次的目标 IP 地址的连接数.所以结果DataFrame应该是:
I want to create a new column under some condition: count of connections to this time's target IP address in the past 2 time units. So the result DataFrame should be:
ID | 时间 | targetIP | 计数 |
---|---|---|---|
1 | 1 | 192.163.0.1 | 0 |
2 | 2 | 192.163.0.2 | 0 |
3 | 3 | 192.163.0.1 | 1 |
4 | 5 | 192.163.0.1 | 1 |
5 | 6 | 192.163.0.2 | 0 |
6 | 7 | 192.163.0.2 | 1 |
7 | 8 | 192.163.0.2 | 2 |
例如ID=7
,targetIP
为192.163.0.2
过去2个时间单位连接到系统,即ID=5
和ID=6
,它们的targetIP
也是192.163.0.2
.所以 ID=7
的计数是 2.
For example, ID=7
, the targetIP
is 192.163.0.2
Connected to system in past 2 time units, which are ID=5
and ID=6
, and their targetIP
are also 192.163.0.2
. So the count about ID=7
is 2.
期待您的帮助.
推荐答案
您可以使用 count
over Window bounded with range between - 2 和当前行,得到最后 2 个 IP 的计数时间单位.
You can use count
over Window bounded with range between - 2 and current row, to get the count of IP in the last 2 time units.
使用 Spark SQL,您可以执行以下操作:
Using Spark SQL you can do something like this:
df.createOrReplaceTempView("connection_logs")
df1 = spark.sql("""
SELECT *,
COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time
RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
) -1 AS count
FROM connection_logs
ORDER BY ID
""")
df1.show()
#+---+----+-----------+-----+
#| ID|Time| targetIP|count|
#+---+----+-----------+-----+
#| 1| 1|192.163.0.1| 0|
#| 2| 2|192.163.0.2| 0|
#| 3| 3|192.163.0.1| 1|
#| 4| 5|192.163.0.1| 1|
#| 5| 6|192.163.0.2| 0|
#| 6| 7|192.163.0.2| 1|
#| 7| 8|192.163.0.2| 2|
#+---+----+-----------+-----+
或者使用 DataFrame API:
Or using DataFrame API:
from pyspark.sql import Window
from pyspark.sql import functions as F
time_unit = lambda x: x
w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)
df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")
df1.show()
这篇关于如何在某些条件下在 Spark DataFrame 中创建新列“计数"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!