根据,当你在某列上做一个 collect_list 时,该列中的 null 值会被删除.我已经检查过,这是真的.

According to the accepted answer in pyspark collect_set or collect_list with groupby, when you do a collect_list on a certain column, the null values in this column are removed. I have checked and this is true.


But in my case, I need to keep the null columns -- How can I achieve this?

我没有找到任何关于这种 collect_list 函数变体的信息.

I did not find any info on this kind of a variant of collect_list function.


我有一个数据框 df 如下:

I have a dataframe df as below:

cId   |  eId  |  amount  |  city
1     |  2    |   20.0   |  Paris
1     |  2    |   30.0   |  Seoul
1     |  3    |   10.0   |  Phoenix
1     |  3    |   5.0    |  null

我想使用以下映射将其写入 Elasticsearch 索引:

I want to write this to an Elasticsearch index with the following mapping:

"mappings": {
    "doc": {
        "properties": {
            "eId": { "type": "keyword" },
            "cId": { "type": "keyword" },
            "transactions": {
                "type": "nested",
                "properties": {
                    "amount": { "type": "keyword" },
                    "city": { "type": "keyword" }

为了符合上面的嵌套映射,我转换了我的 df 以便对于 eId 和 cId 的每个组合,我有一个这样的交易数组:

In order to conform to the nested mapping above, I transformed my df so that for each combination of eId and cId, I have an array of transactions like this:

df_nested = df.groupBy('eId','cId').agg(collect_list(struct('amount','city')).alias("transactions"))
 |-- cId: integer (nullable = true)
 |-- eId: integer (nullable = true)
 |-- transactions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: float (nullable = true)
 |    |    |-- city: string (nullable = true)

df_nested 保存为 json 文件,有我得到的 json 记录:

Saving df_nested as a json file, there are the json records that I get:


如您所见 - 当 cId=1eId=3 时,我的数组元素之一 amount=30.0 没有city 属性,因为这是我的原始数据 (df) 中的 null.当我使用 collect_list 函数时,空值被删除.

As you can see - when cId=1 and eId=3, one of my array elements where amount=30.0 does not have the city attribute because this was a null in my original data (df). The nulls are being removed when I use the collect_list function.

但是,当我尝试使用上述索引将 df_nested 写入 elasticsearch 时,由于模式不匹配而出错.这基本上就是为什么我想在应用 collect_list 函数后保留我的空值的原因.

However, when I try writing df_nested to elasticsearch with the above index, it errors because there is a schema mismatch. This is basically the reason as to why I want to retain my nulls after applying the collect_list function.


    from pyspark.sql.functions import create_map, collect_list, lit, col, to_json, from_json
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SQLContext, HiveContext, SparkSession, types, Row
    from pyspark.sql import functions as f
    import os

    app_name = "CollList"
    conf = SparkConf().setAppName(app_name)
    spark = SparkSession.builder.appName(app_name).config(conf=conf).enableHiveSupport().getOrCreate()

    df = spark.createDataFrame([[1, 2, 20.0, "Paris"], [1, 2, 30.0, "Seoul"],
        [1, 3, 10.0, "Phoenix"], [1, 3, 5.0, None]],
        ["cId", "eId", "amount", "city"])
    print("Actual data")
Actual data
|cId|eId|amount|city   |
|1  |2  |20.0  |Paris  |
|1  |2  |30.0  |Seoul  |
|1  |3  |10.0  |Phoenix|
|1  |3  |5.0   |null   |
    #collect_list that skips null columns
    df1 = df.groupBy(f.col('city'))\
            .agg(f.collect_list(f.to_json(f.struct([f.col(x).alias(x) for x in (c for c in df.columns if c != 'cId' and c != 'eId' )])))).alias('newcol')
    print("Collect List Data - Missing Null Columns in the list")
    df1.show(10, False)
Collect List Data - Missing Null Columns in the list
|city   |collect_list(structstojson(named_struct(NamePlaceholder(), amount AS `amount`, NamePlaceholder(), city AS `city`)))|
|Phoenix|[{"amount":10.0,"city":"Phoenix"}]                                                                                 |
|null   |[{"amount":5.0}]                                                                                                   |
|Paris  |[{"amount":20.0,"city":"Paris"}]                                                                                   |
|Seoul  |[{"amount":30.0,"city":"Seoul"}]                                                                                   |
    my_list = []
    for x in (c for c in df.columns if c != 'cId' and c != 'eId' ):

    grp_by = ["eId","cId"]
    df_nested = df.withColumn("transactions", create_map(my_list))\

    print("collect list after create_map")
collect list after create_map
|eId|cId|transactions                                                        |
|2  |1  |[{"amount":"20.0","city":"Paris"}, {"amount":"30.0","city":"Seoul"}]|
|3  |1  |[{"amount":"10.0","city":"Phoenix"}, {"amount":"5.0","city":null}]  |

