本文介绍了类型错误:“GroupedData"对象在 pyspark 中不可迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 spark 版本 2.0.1 &蟒蛇 2.7.我正在运行以下代码

I'm using spark version 2.0.1 & python 2.7. I'm running following code

# This will return a new DF with all the columns + id
data1 = data.withColumn("id", monotonically_increasing_id()) # Create an integer index
data1.show()

def create_indexes(df,
                   fields=['country', 'state_id', 'airport', 'airport_id']):
    """ Create indexes for the different element ids
        for CMRs. This allows us to select CMRs that match
        a given element and element value very quickly.
    """
    if fields == None:
        print("No fields specified, returning")
        return
    for field in fields:
        if field not in df.columns:
            print('field: ', field, " is not in the data...")
            return
    indexes = {}
    for field in fields:
        print(field)
        res = df.groupby(field)
        index = {label: np.array(vals['id'], np.int32) for label, vals in res}
        indexes[field] = index
    return indexes

# Create indexes. Some of them take a lot of time!
#Changed dom_client_id by gbl_buy_grp_id as it was changed in Line Number 
indexes = create_indexes(data1, fields=['country', 'state_id', 'airport', 'airport_id'])
print type(indexes)

我在运行此代码时收到以下错误消息

I'm getting following error message While running this code

TypeError: 'GroupedData' object is not iterable

你能帮我解决这个问题吗?

Can you please help me to solve this issue?

推荐答案

您必须对 GroupedData 并收集结果,然后才能对其进行迭代,例如每组计数项目:res = df.groupby(field).count().collect()

You have to perform an aggregation on the GroupedData and collect the results before you can iterate over them e.g. count items per group: res = df.groupby(field).count().collect()

这篇关于类型错误:“GroupedData"对象在 pyspark 中不可迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-15 11:11