问题描述
我有以下架构的数据.我希望所有列都应按字母顺序排序.我想要它在 pyspark 数据框中.
root|-- _id: string (nullable = true)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)|-- 地址: struct (nullable = true)||-- 引脚:整数(可为空 = 真)||-- city: string (nullable = true)||-- 街道:字符串(可为空 = 真)
下面的代码只对外部列排序,对嵌套列不排序.
>>>cols = df.columns>>>df2=df[排序(列)]>>>df2.printSchema()这段代码之后的架构看起来像
root|-- _id: string (nullable = true)|-- 地址: struct (nullable = true)||-- 引脚:整数(可为空 = 真)||-- city: string (nullable = true)||-- 街道:字符串(可为空 = 真)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)
(因为id有下划线,所以先出现)
我想要的架构如下.(连地址里面的列也要排序)
root|-- _id: string (nullable = true)|-- 地址: struct (nullable = true)||-- city: string (nullable = true)||-- 引脚:整数(可为空 = 真)||-- 街道:字符串(可为空 = 真)|-- first_name: string (nullable = true)|-- last_name: string (nullable = true)
提前致谢.
这里有一个解决方案,适用于任意深度嵌套的 StructType
,它不依赖于硬编码任何列名.
为了演示,我创建了以下稍微复杂的架构,其中 address
列中有第二级嵌套.假设您的 DataFrame schema
如下:
df.printSchema()#根# |-- _id: string (nullable = true)# |-- first_name: string (nullable = true)# |-- last_name: string (nullable = true)# |-- 地址: struct (nullable = true)# ||-- 引脚:整数(可为空 = 真)# ||-- city: string (nullable = true)# ||-- zip: struct (nullable = true)# |||-- last4:整数(可为空=真)# |||-- first5: 整数 (nullable = true)# ||-- 街道:字符串(可为空 = 真)
注意 address.zip
字段,其中包含 2 个乱序子字段.
您可以定义一个函数,该函数将递归地遍历您的schema
并对字段进行排序以构建一个 Spark-SQL 选择表达式:
from pyspark.sql.types import StructType, StructFielddef schemaToSelectExpr(schema, baseField=""):select_cols = []对于 sorted(schema, key=lambda x: x.name) 中的 structField:如果 structField.dataType.typeName() == 'struct':子字段 = []对于排序中的 fld(structField.jsonValue()['type']['fields'],key=lambda x: x['name']):newStruct = StructType([StructField.fromJson(fld)])newBaseField = structField.name如果基场:newBaseField = baseField + "."+ 新基场subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))select_cols.append("struct(" + ",".join(subFields) + ") AS {}".format(structField.name))别的:如果基场:select_cols.append(baseField + "." + structField.name)别的:select_cols.append(structField.name)返回 select_cols
在这个 DataFrame 的模式上运行这个结果(为了可读性,我将长的地址"字符串分成两行):
print(schemaToSelectExpr(df.schema))#['_ID',#'struct(address.city,address.pin,address.street,# struct(address.zip.first5,address.zip.last4) AS zip) AS address',# '名',# '姓']
现在使用 selectExpr
对列进行排序:
df = df.selectExpr(schemaToSelectExpr(df.schema))df.printSchema()#根# |-- _id: string (nullable = true)# |-- 地址: struct (nullable = false)# ||-- city: string (nullable = true)# ||-- 引脚:整数(可为空 = 真)# ||-- 街道:字符串(可为空 = 真)# ||-- zip: struct (nullable = false)# |||-- first5: 整数 (nullable = true)# |||-- last4:整数(可为空 = 真)# |-- first_name: string (nullable = true)# |-- last_name: string (nullable = true)
I have data with below schema. I want all the columns should be in sorted alphabetically. I want it in pyspark data frame.
root
|-- _id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- address: struct (nullable = true)
| |-- pin: integer (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
The below code sorts only the outer columns but not the nested columns.
>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()
The schema after this code looks like
root
|-- _id: string (nullable = true)
|-- address: struct (nullable = true)
| |-- pin: integer (nullable = true)
| |-- city: string (nullable = true)
| |-- street: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
(since there's underscore at id, it appears first)
The schema which I want is as below. (Even the columns inside the address should be sorted)
root
|-- _id: string (nullable = true)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- pin: integer (nullable = true)
| |-- street: string (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
Thanks in advance.
Here is a solution that should work for arbitrarily deeply nested StructType
s which doesn't rely on hard coding any column names.
To demonstrate, I've created the following slightly more complex schema, where there is a second level of nesting within the address
column. Let's suppose that your DataFrame schema
were the following:
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# | |-- pin: integer (nullable = true)
# | |-- city: string (nullable = true)
# | |-- zip: struct (nullable = true)
# | | |-- last4: integer (nullable = true)
# | | |-- first5: integer (nullable = true)
# | |-- street: string (nullable = true)
Notice the address.zip
field which contains 2 out of order sub fields.
You can define a function that will recursively step through your schema
and sort the fields to build a Spark-SQL select expression:
from pyspark.sql.types import StructType, StructField
def schemaToSelectExpr(schema, baseField=""):
select_cols = []
for structField in sorted(schema, key=lambda x: x.name):
if structField.dataType.typeName() == 'struct':
subFields = []
for fld in sorted(structField.jsonValue()['type']['fields'],
key=lambda x: x['name']):
newStruct = StructType([StructField.fromJson(fld)])
newBaseField = structField.name
if baseField:
newBaseField = baseField + "." + newBaseField
subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))
select_cols.append(
"struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
)
else:
if baseField:
select_cols.append(baseField + "." + structField.name)
else:
select_cols.append(structField.name)
return select_cols
Running this on this DataFrame's schema yields (I've broken the long 'address' string into two lines for readability):
print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
# struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']
Now use selectExpr
to sort the columns:
df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# | |-- city: string (nullable = true)
# | |-- pin: integer (nullable = true)
# | |-- street: string (nullable = true)
# | |-- zip: struct (nullable = false)
# | | |-- first5: integer (nullable = true)
# | | |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
这篇关于如何在pyspark中按字母顺序对嵌套结构的列进行排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!