问题描述
我正在尝试以 CSV 格式保存 pyspark.sql.dataframe.DataFrame(也可以是另一种格式,只要它易于阅读).
到目前为止,我找到了几个保存 DataFrame 的示例.但是,每次我写它都会丢失信息.
数据集示例:
# 创建一个示例 Pyspark DataFrame从 pyspark.sql 导入行Employee = Row("firstName", "lastName", "email", "salary")员工 1 = 员工('A', 'AA', 'mail1', 100000)员工 2 = 员工('B', 'BB', 'mail2', 120000)员工 3 = 员工('C',无,'mail3',140000)员工 4 = 员工('D', 'DD', 'mail4', 160000)员工5 = 员工('E','EE','mail5',160000)部门 1 = Row(id='123', name='HR')部门 2 = Row(id='456', name='OPS')部门 3 = Row(id='789', name='FN')部门4 = Row(id='101112', name='DEV')DepartmentWithEmployees1 = Row(department=department1,雇员=[employee1,employee2,employee5])DepartmentWithEmployees2 = Row(department=department2,雇员=[employee3,employee4])DepartmentWithEmployees3 = Row(department=department3,雇员=[employee1,employee4,employee3])DepartmentWithEmployees4 = Row(department=department4,雇员=[employee2,employee3])DepartmentWithEmployees_Seq = [departmentWithEmployees1,departmentWithEmployees2]dframe = spark.createDataFrame(departmentsWithEmployees_Seq)
为了将此文件保存为 CSV,我首先尝试了
仅供参考:我在 Databricks 工作,使用 Python.
因此,如何在不丢失信息的情况下写入我的数据(上面示例中的 dframe)?
非常感谢!
编辑为 Pault 添加一张图片,以显示 csv(和标题)的格式.
编辑 2替换图片例如csv输出:
运行 Pault 的代码后:
from pyspark.sql.functions import to_jsondframe.select(*[to_json(c).alias(c) for c in dframe.columns])\.repartition(1).write.csv("junk_mycsv.csv", header= True)
输出不整齐,因为大多数列标题都是空的(由于嵌套格式?).只复制第一行:
部门员工(空ColName)(空ColName)(以此类推){\id\":\"123\" \"name\":\"HR\"}" [{\firstName\":\"A\" \"lastName\":\"AA\" (...)
您的数据框具有以下架构:
dframe.printSchema()#根# |-- 部门: struct (nullable = true)# ||-- id: string (nullable = true)# ||-- 名称:字符串(可为空 = 真)# |-- 员工: 数组 (nullable = true)# ||-- 元素: struct (containsNull = true)# |||-- firstName: string (nullable = true)# |||-- lastName: string (nullable = true)# |||-- 电子邮件:字符串(可为空 = 真)# |||-- 薪水:long (nullable = true)
所以 department
列是一个带有两个命名字段的 StructType
,而 employees
列是一个包含四个命名字段的结构数组.看来您想要的是以一种格式写入数据,即同时保存每条记录的 key
和 value
.
一种选择是以 JSON 格式而不是 CSV 格式写入文件:
dframe.write.json("junk.json")
产生以下输出:
{"department":{"id":"123","name":"HR"},"employees":[{"firstName":"A","lastName":"AA","email":"mail1","salary":100000},{"firstName":"B","lastName":"BB","email":"mail2","salary":120000},{"firstName":"E","lastName":"EE","email":"mail5","salary":160000}]}{"部门":{"id":"456","name":"OPS"},"employees":[{"firstName":"C","email":"mail3","salary":140000},{"firstName":"D","lastName":"DD","email":"mail4","salary":160000}]}
或者如果您想将其保留为 CSV 格式,您可以使用 to_json
在写入 CSV 之前将每一列转换为 JSON.
# 遍历所有列# 但您也可以将其限制为要转换的列从 pyspark.sql.functions 导入到_jsondframe.select(*[to_json(c).alias(c) for c in dframe.columns])\.write.csv("junk_mycsv.csv")
这会产生以下输出:
"{\"id\":\"123\",\"name\":\"HR\"}","[{\"firstName\":\"A\",\"lastName\":\"AA\",\"email\":\"mail1\",\"salary\":100000},{\"firstName\":\"B\",\"lastName\":\"BB\",\"email\":\"mail2\",\"salary\":120000},{\"firstName\":\"E\",\"lastName\":\"EE\",\"email\":\"mail5\",\"薪水\":160000}]""{\"id\":\"456\",\"name\":\"OPS\"}","[{\"firstName\":\"C\",\"email\":\"mail3\",\"salary\":140000},{\"firstName\":\"D\",\"lastName\":\"DD\",\"email\":\"mail4\",\"薪水\":160000}]"
注意双引号被转义了.
I am trying to save an pyspark.sql.dataframe.DataFrame in CSV format (could also be another format, as long as it is easily readable).
So far, I found a couple of examples to save the DataFrame. However, it is losing information everytime that I write it.
Dataset example:
# Create an example Pyspark DataFrame
from pyspark.sql import Row
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('A', 'AA', 'mail1', 100000)
employee2 = Employee('B', 'BB', 'mail2', 120000 )
employee3 = Employee('C', None, 'mail3', 140000 )
employee4 = Employee('D', 'DD', 'mail4', 160000 )
employee5 = Employee('E', 'EE', 'mail5', 160000 )
department1 = Row(id='123', name='HR')
department2 = Row(id='456', name='OPS')
department3 = Row(id='789', name='FN')
department4 = Row(id='101112', name='DEV')
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2, employee5])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4, employee3])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])
departmentsWithEmployees_Seq = [departmentWithEmployees1, departmentWithEmployees2]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)
In order to save this file as CSV, I firstly tried this solution:
type(dframe)
Out[]: pyspark.sql.dataframe.DataFrame
dframe.write.csv('junk_mycsv.csv')
Unfortunately, that result in this error:
org.apache.spark.sql.AnalysisException: CSV data source does not support struct<id:string,name:string> data type.;
That is the reason why I tried another possibility, to convert the spark dataframe into a pandas dataframe, and save it then. As mentioned in this example.
pandas_df = dframe.toPandas()
Works good! However, If I show my data, it is missing data:
print(pandas_df.head())
department employees
0 (123, HR) [(A, AA, mail1, 100000), (B, BB, mail2, 120000...
1 (456, OPS) [(C, None, mail3, 140000), (D, DD, mail4, 1600...
As you can see in the snapshot below, we are missing information. Because the data should be like this:
department employees
0 id:123, name:HR firstName: A, lastName: AA, email: mail1, salary: 100000
# Info is missing like 'id', 'name', 'firstName', 'lastName', 'email' etc.
# For the complete expected example, see screenshow below.
Just for information: I am working in Databricks, with Python.
Therefore, how can I write my data (dframe from the example above) without losing information?
Many thanks in advance!
EditAdding a picture for Pault, to show the format of the csv (and the headers).
Edit2Replacing the picture for example csv output:
After running Pault's code:
from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
.repartition(1).write.csv("junk_mycsv.csv", header= True)
The output is not tidy, since most column headers are empty (due the nested format?). Only copying the first row:
department employees (empty ColName) (empty ColName) (and so on)
{\id\":\"123\" \"name\":\"HR\"}" [{\firstName\":\"A\" \"lastName\":\"AA\" (...)
Your dataframe has the following schema:
dframe.printSchema()
#root
# |-- department: struct (nullable = true)
# | |-- id: string (nullable = true)
# | |-- name: string (nullable = true)
# |-- employees: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- firstName: string (nullable = true)
# | | |-- lastName: string (nullable = true)
# | | |-- email: string (nullable = true)
# | | |-- salary: long (nullable = true)
So the department
column is a StructType
with two named fields and the employees
column is an array of structs with four named fields. It appears what you want is to write the data in a format that saves both the key
and the value
for each record.
One option is to write the file in JSON format instead of CSV:
dframe.write.json("junk.json")
Which produces the following output:
{"department":{"id":"123","name":"HR"},"employees":[{"firstName":"A","lastName":"AA","email":"mail1","salary":100000},{"firstName":"B","lastName":"BB","email":"mail2","salary":120000},{"firstName":"E","lastName":"EE","email":"mail5","salary":160000}]}
{"department":{"id":"456","name":"OPS"},"employees":[{"firstName":"C","email":"mail3","salary":140000},{"firstName":"D","lastName":"DD","email":"mail4","salary":160000}]}
Or if you wanted to keep it in CSV format, you can use to_json
to convert each column to JSON before writing the CSV.
# looping over all columns
# but you can also just limit this to the columns you want to convert
from pyspark.sql.functions import to_json
dframe.select(*[to_json(c).alias(c) for c in dframe.columns])\
.write.csv("junk_mycsv.csv")
This produces the following output:
"{\"id\":\"123\",\"name\":\"HR\"}","[{\"firstName\":\"A\",\"lastName\":\"AA\",\"email\":\"mail1\",\"salary\":100000},{\"firstName\":\"B\",\"lastName\":\"BB\",\"email\":\"mail2\",\"salary\":120000},{\"firstName\":\"E\",\"lastName\":\"EE\",\"email\":\"mail5\",\"salary\":160000}]"
"{\"id\":\"456\",\"name\":\"OPS\"}","[{\"firstName\":\"C\",\"email\":\"mail3\",\"salary\":140000},{\"firstName\":\"D\",\"lastName\":\"DD\",\"email\":\"mail4\",\"salary\":160000}]"
Note that the double-quotes are escaped.
这篇关于写一个pyspark.sql.dataframe.DataFrame不丢失信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!