本文介绍了如何检查手头的值是否在某些 PySpark 数据帧的特定列中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个 PySpark 数据帧 trips
,我正在其上执行聚合.对于每个PULocationID
,我首先计算total_amount
的平均值,然后是行程数,最后是DOLocationID
位于 mtrips
的 DOLocationID
列中,另一个 PySpark 数据帧.
I have a PySpark dataframe, trips
, on which I am performing aggregations. For each PULocationID
, I am first computing the average of total_amount
, then the number of trips, and finally, the number of trips whose DOLocationID
is in the DOLocationID
column of mtrips
, another PySpark dataframe.
我在下面包含了 trips
和 mtrips
的模式.
I'm including the schemas for both trips
and mtrips
below.
我目前的代码如下但不完整:
My current code is as follows but it is incomplete:
import pyspark.sql.functions as F
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
(
trips
.groupBy('PULocationID', 'DOLocationID')
.agg(
F.mean('total_amount').alias('avg_total_amt'),
F.count('*').alias('trip_count'),
cnt_cond(mtrips.DOLocationID.contains(trips.DOLocationID)).alias('trips_to_pop')
)
.show(200)
)
trips.printSchema()
# root
# |-- VendorID: integer (nullable = true)
# |-- tpep_pickup_datetime: timestamp (nullable = true)
# |-- tpep_dropoff_datetime: timestamp (nullable = true)
# |-- passenger_count: integer (nullable = true)
# |-- trip_distance: double (nullable = true)
# |-- RatecodeID: integer (nullable = true)
# |-- store_and_fwd_flag: string (nullable = true)
# |-- PULocationID: integer (nullable = true)
# |-- DOLocationID: integer (nullable = true)
# |-- payment_type: integer (nullable = true)
# |-- fare_amount: double (nullable = true)
# |-- extra: double (nullable = true)
# |-- mta_tax: double (nullable = true)
# |-- tip_amount: double (nullable = true)
# |-- tolls_amount: double (nullable = true)
# |-- improvement_surcharge: double (nullable = true)
# |-- total_amount: double (nullable = true)
# |-- congestion_surcharge: double (nullable = true)
mtrips.printSchema()
# root
# |-- DOLocationID: integer (nullable = true)
# |-- pcount: long (nullable = true)
推荐答案
这是解决问题的代码行:
Here is the line of code that solved the problem:
cnt_cond(col('DOLocationID').isin([i['DOLocationID'] for i in mtrips.collect()])).alias('trips_to_pop')
这篇关于如何检查手头的值是否在某些 PySpark 数据帧的特定列中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!