问题描述
我有一个时间段内用户的纬度/经度格式的位置数据集.我想计算这些用户走过的距离.示例数据集:
I have a dataset of locations in Lat/Lon format of users in a time period. I would like to calculate the distance these users traveled. Sample dataset:
|时间戳|用户|纬度|经度||1462838468|49B4361512443A4DA...|39.777982|-7.054599||1462838512|49B4361512443A4DA...|39.777982|-7.054599||1462838389|49B4361512443A4DA...|39.777982|-7.054599||1462838497|49B4361512443A4DA...|39.777982|-7.054599||1465975885|6E9E0581E2A032FD8...|37.118362|-8.205041||1457723815|405C238E25FE0B9E7...|37.177322|-7.426781||1457897289|405C238E25FE0B9E7...|37.177922|-7.447443||1457899229|405C238E25FE0B9E7...|37.177922|-7.447443||1457972626|405C238E25FE0B9E7...|37.18059|-7.46128||1458062553|405C238E25FE0B9E7...|37.177322|-7.426781||1458241825|405C238E25FE0B9E7...|37.178172|-7.444512||1458244457|405C238E25FE0B9E7...|37.178172|-7.444512||1458412513|405C238E25FE0B9E7...|37.177322|-7.426781||1458412292|405C238E25FE0B9E7...|37.177322|-7.426781||1465197963|6E9E0581E2A032FD8...|37.118362|-8.205041||1465202192|6E9E0581E2A032FD8...|37.118362|-8.205041||1465923817|6E9E0581E2A032FD8...|37.118362|-8.205041||1465923766|6E9E0581E2A032FD8...|37.118362|-8.205041||1465923748|6E9E0581E2A032FD8...|37.118362|-8.205041||1465923922|6E9E0581E2A032FD8...|37.118362|-8.205041|
我曾想过使用自定义聚合器函数,但似乎没有 Python 支持.而且操作需要按特定顺序在相邻点上完成,所以我不知道自定义聚合器是否有效.
I have thought of using a custom aggregator function but it seems there is no Python support for this. Moreover the operations need to be done on adjacent points in a specific order, so I don't know if a custom aggregator would work.
我也看过reduceByKey
,但距离函数似乎不能满足运算符要求.
I have also looked at reduceByKey
but the operator requirements don't seem to be met by the distance function.
有没有办法在 Spark 中以高效的方式执行此操作?
Is there a way to perform this operation in an efficient manner in Spark?
推荐答案
这看起来像是窗口函数的工作.假设我们将距离定义为:
It looks like a job for window functions. Assuming we define distance as:
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
def dist(long_x, lat_x, long_y, lat_y):
return acos(
sin(toRadians(lat_x)) * sin(toRadians(lat_y)) +
cos(toRadians(lat_x)) * cos(toRadians(lat_y)) *
cos(toRadians(long_x) - toRadians(long_y))
) * lit(6371.0)
您可以将窗口定义为:
from pyspark.sql.window import Window
w = Window().partitionBy("User").orderBy("Timestamp")
并使用 lag
计算连续观测之间的距离:
and compute distances between consecutive observations using lag
:
from pyspark.sql.functions import lag
df.withColumn("dist", dist(
"longitude", "latitude",
lag("longitude", 1).over(w), lag("latitude", 1).over(w)
).alias("dist"))
之后您可以执行标准聚合.
After that you can perform standard aggregation.
这篇关于如何使用(Py)Spark对数据集中数据点之间的距离求和?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!