问题描述
我有一个带有 GPS 点的 Spark 数据帧 sdf
,如下所示:
I have a Spark dataframe sdf
with GPS points that looks like this:
d = {'user': ['A', 'A', 'A', 'A', 'A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C', 'C', 'A', 'A'],
'lat': [37.75243634842733, 37.75344580658182, 37.75405656449232, 37.753649393112181,37.75409897804892, 37.753937806404586, 37.72767062183685, 37.72710631810977, 37.72605407110467, 37.71141865080228, 37.712199505873926, 37.713285899241896, 37.71428740401767, 37.712810604103346, 37.75405656449232, 37.753649393112181],
'lon': [-122.41924881935118, -122.42006421089171, -122.419216632843, -122.41784334182738, -122.4169099330902, -122.41549372673035, -122.3878937959671, -122.3884356021881, -122.38841414451599, -122.44688630104064, -122.44474053382874, -122.44361400604248, -122.44260549545288, -122.44156479835509, -122.4169099330902, -122.41549372673035],
'date': ['2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-03', '2018-02-04', '2018-02-04'],
'radius': [10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10]}
pdf = pd.DataFrame(data=d)
sdf = spark.createDataFrame(pdf)
+----+------------------+-------------------+----------+------+
|user| lat| lon| date|radius|
+----+------------------+-------------------+----------+------+
| A| 37.75243634842733|-122.41924881935118|2018-02-03| 10|
| A| 37.75344580658182|-122.42006421089171|2018-02-03| 10|
| A| 37.75405656449232| -122.419216632843|2018-02-03| 10|
| A|37.753649393112184|-122.41784334182738|2018-02-03| 10|
| A| 37.75409897804892| -122.4169099330902|2018-02-03| 10|
| A|37.753937806404586|-122.41549372673035|2018-02-03| 10|
| B| 37.72767062183685| -122.3878937959671|2018-02-03| 10|
| B| 37.72710631810977| -122.3884356021881|2018-02-03| 10|
| B| 37.72605407110467|-122.38841414451599|2018-02-03| 10|
| C| 37.71141865080228|-122.44688630104064|2018-02-03| 10|
| C|37.712199505873926|-122.44474053382874|2018-02-03| 10|
| C|37.713285899241896|-122.44361400604248|2018-02-03| 10|
| C| 37.71428740401767|-122.44260549545288|2018-02-03| 10|
| C|37.712810604103346|-122.44156479835509|2018-02-03| 10|
| A| 37.75405656449232| -122.4169099330902|2018-02-04| 10|
| A|37.753649393112184|-122.41549372673035|2018-02-04| 10|
+----+------------------+-------------------+----------+------+
由于 spark 数据帧包含不同用户在不同日期生成的不同 GPS 轨迹,我想编写一个函数来循环遍历此 df 并将相应的坐标集提供给每个 date
和每个 user
组,而不是一次全部.
Since the spark dataframe contains different GPS trajectories generated by different users on different days, I want to write a function that loops through this df and feeds the corresponding set of coordinates to the (OSRM) request per date
and per user
group and not all at once.
from typing import Dict, Any, List, Tuple
import pyspark.sql.functions as F
import requests
# Format coordinates into a concatenated string formatted for the OSRM server
def format_coords(df):
coords = df.agg(F.concat_ws(';', F.collect_list(F.format_string('%f,%f', 'lon', 'lat')))).head()[0]
return(coords)
# Format dictionary of additional options to the OSRM request into a concatenated string format.
def format_options(options: Dict[str, str]) -> str:
options = "&".join([f"{k}={v}" for k, v in options.items()])
return options
# Format radiuses into a concatenated string formatted for the OSRM server
def format_radiuses(df):
radiuses = "&radiuses=" + df.agg(F.concat_ws(';', F.collect_list(F.format_string('%d', 'radius')))).head()[0]
return(radiuses)
# Make request
def make_request(coords, radiuses, options):
coords = format_coords(coords)
radiuses = format_radiuses(radiuses)
options = format_options(options) if options else ""
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}{radiuses}"
r = requests.get(url)
return r.json()
不幸的是,运行代码打击会返回一个TypeError: 'GroupedData' object is not iterable
.我错过了什么:
Unfortunately, running the code blow returns a TypeError: 'GroupedData' object is not iterable
. What am I missing:
output = {}
for trip, g in sdf.groupBy('date', 'user'):
output[trip] = make_request(coords = sdf[['lat', 'lon']],
radiuses = sdf[['radius']],
options = {'overview':'full',
'geometries': 'polyline6',
'annotations': 'nodes'})
推荐答案
您可以尝试在分组后聚合字符串:
You can try aggregating the string after group by:
import pyspark.sql.functions as F
import requests
def format_options(options):
options = "&".join([f"{k}={v}" for k, v in options.items()])
return options
def make_request(coords, radiuses, options):
options = format_options(options) if options else ""
url = f"http://router.project-osrm.org/match/v1/car/{coords}?{options}&radiuses={radiuses}"
r = requests.get(url)
print(url)
return r.json()
coords = sdf.groupBy('date', 'user').agg(
F.concat_ws(';',
F.collect_list(F.format_string('%f,%f', 'lon', 'lat'))
).alias('coords'),
F.concat_ws(';',
F.collect_list(F.format_string('%d', 'radius'))
).alias('radius')
).collect()
options = {'overview':'full', 'geometries': 'polyline6', 'annotations': 'nodes'}
output = {(c[0], c[1]): make_request(c[2], c[3], options) for c in coords}
"""
{('2018-02-03', 'A'): {'code': 'Ok',
'matchings': [{'confidence': 0.374625,
'distance': 325.2,
'duration': 50.6,
'geometry': 'y{h_gAh~znhF}@k[OmFMoFcAea@IeD[uMAYKsDMsDAe@}@u_@g@aTMwFMwFwAqq@',
'legs': [{'annotation': {'nodes': [1974590926,
4763953263,
65359046,
4763953265,
5443374298,
2007343352]},
'distance': 116.7,
'duration': 18.8,
'steps': [],
'summary': '',
'weight': 18.8},
{'annotation': {'nodes': [5443374298,
2007343352,
4763953266,
65359043,
4763953269,
2007343354,
4763953270]},
'distance': 85.6,
'duration': 12.2,
'steps': [],
'summary': '',
'weight': 12.2},
{'annotation': {'nodes': [2007343354,
4763953270,
65334199,
4763953274,
2007343347]},
'distance': 122.9,
'duration': 19.6,
'steps': [],
'summary': '',
'weight': 19.6}],
'weight': 50.6,
'weight_name': 'routability'}],
'tracepoints': [None,
None,
{'alternatives_count': 0,
'distance': 28.078003,
'hint': '20nBh2NdHwA2AAAAOgAAAAwAAAAPAAAAiVMWQq2VIEIAuABB7FgoQTYAAAA6AAAADAAAAA8AAABDRAAACwi0-M0TQALvB7T4yRRAAgEAXwUADb92',
'location': [-122.419189, 37.753805],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 0},
{'alternatives_count': 0,
'distance': 26.825184,
'hint': 'Ew3BBzFbH4AdAAAACwAAAA0AAAAAAAAAIxmmQTSs6kCiuRFBAAAAAB0AAAALAAAADQAAAAAAAABDRAAANg20-CIUQAJNDbT4MRNAAgIAnxAADb92',
'location': [-122.417866, 37.75389],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 1},
{'alternatives_count': 0,
'distance': 16.583412,
'hint': 'DQ3BBxQNwYcqAAAAQwAAABAAAAANAAAA0i_uQb3SOEKKPC9BG1EaQSoAAABDAAAAEAAAAA0AAABDRAAAABG0-F4UQALyELT48xRAAgEAnxAADb92',
'location': [-122.416896, 37.75395],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 2},
{'alternatives_count': 7,
'distance': 10.013916,
'hint': 'Dg3Bh1WcyQBmAAAACAAAABAAAAANAAAAQOKOQg89nkCKPC9BEMcOQWYAAAAIAAAAEAAAAA0AAABDRAAAcha0-KwUQAJ6FrT4UhRAAgEAbwUADb92',
'location': [-122.415502, 37.754028],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 3}]},
('2018-02-03', 'B'): {'code': 'Ok',
'matchings': [{'confidence': 1e-06,
'distance': 270.4,
'duration': 50,
'geometry': 'euu}fAd_~lhFoAlCMTuAvCvC|Bh@`@hXbUnAdADBhDzCzClCXVzZnW\\X~CnC~@qBLWnWej@',
'legs': [{'annotation': {'nodes': [5443147626,
6360865540,
6360865536,
65307580,
6360865535,
6360865539,
6360865531]},
'distance': 84.8,
'duration': 17.8,
'steps': [],
'summary': '',
'weight': 17.8},
{'annotation': {'nodes': [6360865539,
6360865531,
6360865525,
65343521,
6360865527,
6360865529,
6360865523,
6360865520,
65321110,
6360865519,
6360865522,
6376329343]},
'distance': 185.6,
'duration': 32.2,
'steps': [],
'summary': '',
'weight': 32.2}],
'weight': 50,
'weight_name': 'routability'}],
'tracepoints': [{'alternatives_count': 0,
'distance': 11.53267,
'hint': 'ZpfJAOSXyYALAAAArQAAAA4AAAAsAAAAnpH1QDVG8EJWgBdBa2v0QQsAAACtAAAADgAAACwAAABDRAAA_YG0-GOtPwJKgrT4t60_AgIA3wcADb92',
'location': [-122.387971, 37.727587],
'matchings_index': 0,
'name': 'Underwood Avenue',
'waypoint_index': 0},
{'alternatives_count': 0,
'distance': 13.565054,
'hint': 'ZZfJgALywAdPAAAACAAAABMAAAASAAAA7ONaQo4CrUDv7U1BJdFAQU8AAAAIAAAAEwAAABIAAABDRAAArX-0-MerPwIsgLT4gqs_AgIAbw0ADb92',
'location': [-122.388563, 37.727175],
'matchings_index': 0,
'name': 'Jennings Street',
'waypoint_index': 1},
{'alternatives_count': 1,
'distance': 9.601917,
'hint': 'WZfJAP7xwIecAAAAbAAAABEAAAALAAAAdujYQqu4lUJXHD1B9-ruQJwAAABsAAAAEQAAAAsAAABDRAAAAoC0-CCnPwJCgLT4Zqc_AgIAHxMADb92',
'location': [-122.388478, 37.725984],
'matchings_index': 0,
'name': 'Wallace Avenue',
'waypoint_index': 2}]},
('2018-02-03', 'C'): {'code': 'Ok',
'matchings': [{'confidence': 7.3e-05,
'distance': 420.1,
'duration': 64.1,
'geometry': 'kuy|fAbyjphFcBxEmE`FqJkKiBqBuP}Qgc@ie@eAiAcB}ArA_Eb@mAjKkDnBo@fe@mOrw@kW',
'legs': [{'annotation': {'nodes': [5440513673,
5440513674,
5440513675,
65363070,
1229920760,
65307726,
6906452420,
1229920717,
65361047,
1229920749,
554163599,
3978809925]},
'distance': 235.2,
'duration': 37.5,
'steps': [],
'summary': '',
'weight': 40.1},
{'annotation': {'nodes': [554163599,
3978809925,
65345518,
8256268328]},
'distance': 184.9,
'duration': 26.6,
'steps': [],
'summary': '',
'weight': 26.6}],
'weight': 66.7,
'weight_name': 'routability'}],
'tracepoints': [None,
None,
{'alternatives_count': 0,
'distance': 6.968076,
'hint': 'KLvAhyu7wAcAAAAANQAAAAAAAAAkAAAAAAAAAOCMMUEAAAAA_Z1yQQAAAAAbAAAAAAAAACQAAABDRAAAXqiz-GZ1PwKiqLP4hnU_AgAAzxIADb92',
'location': [-122.443682, 37.713254],
'matchings_index': 0,
'name': '',
'waypoint_index': 0},
{'alternatives_count': 0,
'distance': 16.488956,
'hint': '-rrAB_aPyYAJAAAAIgAAAGgAAAAUAAAA2RnSQL_5uUEPjI9CBTlaQQkAAAAiAAAAaAAAABQAAABDRAAARK2z-J95PwKTrLP4b3k_AgEAXxUADb92',
'location': [-122.442428, 37.714335],
'matchings_index': 0,
'name': 'Allison Street',
'waypoint_index': 1},
{'alternatives_count': 1,
'distance': 17.311636,
'hint': '_brAhwC7wAeZAAAANwAAAAAAAAAKAAAAH4vUQgKXFkIAAAAAXtbYQJkAAAA3AAAAAAAAAAoAAABDRAAA6a-z-HlzPwKjsLP4q3M_AgAAHwoADb92',
'location': [-122.441751, 37.712761],
'matchings_index': 0,
'name': 'Allison Street',
'waypoint_index': 2}]},
('2018-02-04', 'A'): {'code': 'Ok',
'matchings': [{'confidence': 0,
'distance': 205.5,
'duration': 46.4,
'geometry': '{di_gAfovnhFg@iTMwFbCMlXmApH[k@iJoB{l@uFH',
'legs': [{'annotation': {'nodes': [2007343354,
4763953270,
65334199,
4763953267,
5443374265,
5443374261,
5443374264,
5443374263,
5544172171]},
'distance': 205.5,
'duration': 46.4,
'steps': [],
'summary': '',
'weight': 46.4}],
'weight': 46.4,
'weight_name': 'routability'}],
'tracepoints': [{'alternatives_count': 0,
'distance': 11.908542,
'hint': 'DQ3BBxQNwYcrAAAAQgAAABAAAAANAAAAkv_wQeJqN0KKPC9BG1EaQSsAAABCAAAAEAAAAA0AAABDRAAA_BC0-F4UQALyELT4yRRAAgEAnxAADb92',
'location': [-122.4169, 37.75395],
'matchings_index': 0,
'name': '23rd Street',
'waypoint_index': 0},
{'alternatives_count': 6,
'distance': 11.065027,
'hint': 'kQ3Bh____38hAAAAIQAAAMMAAAAAAAAApopaQQAAAADsMaJCAAAAACEAAAAhAAAAwwAAAAAAAABDRAAAlxa0-NASQAJ6FrT4MRNAAgIAbxYADb92',
'location': [-122.415465, 37.753552],
'matchings_index': 0,
'name': '',
'waypoint_index': 1}]}}
"""
这篇关于类型错误:“GroupedData"对象在 pyspark 数据框中不可迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!