问题描述
我正在将Spark SQL与数据框一起使用.我有一个输入数据框,我想将其行追加(或插入)到具有更多列的较大数据框.我该怎么做?
I am using Spark SQL with dataframes. I have an input dataframe, and I would like to append (or insert) its rows to a larger dataframe that has more columns. How would I do that?
如果这是SQL,我会使用INSERT INTO OUTPUT SELECT ... FROM INPUT
,但我不知道如何使用Spark SQL.
If this were SQL, I would use INSERT INTO OUTPUT SELECT ... FROM INPUT
, but I don't know how to do that with Spark SQL.
具体而言:
var input = sqlContext.createDataFrame(Seq(
(10L, "Joe Doe", 34),
(11L, "Jane Doe", 31),
(12L, "Alice Jones", 25)
)).toDF("id", "name", "age")
var output = sqlContext.createDataFrame(Seq(
(0L, "Jack Smith", 41, "yes", 1459204800L),
(1L, "Jane Jones", 22, "no", 1459294200L),
(2L, "Alice Smith", 31, "", 1459595700L)
)).toDF("id", "name", "age", "init", "ts")
scala> input.show()
+---+-----------+---+
| id| name|age|
+---+-----------+---+
| 10| Joe Doe| 34|
| 11| Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+
scala> input.printSchema()
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> output.show()
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
+---+-----------+---+----+----------+
scala> output.printSchema()
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- init: string (nullable = true)
|-- ts: long (nullable = false)
我想将input
的所有行附加到output
的末尾.同时,我想将init
的output
列设置为空字符串''
,将ts
列设置为当前时间戳,例如1461883875L.
I would like to append all the rows of input
to the end of output
. At the same time, I would like to set the output
column of init
to be an empty string ''
and the ts
column to be the current timestamp, e.g. 1461883875L.
任何帮助将不胜感激.
推荐答案
火花DataFrames
是不可变的,因此无法追加/插入行.相反,您可以只添加缺少的列并使用UNION ALL
:
Spark DataFrames
are immutable so it is not possible to append / insert rows. Instead you can just add missing columns and use UNION ALL
:
output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))
这篇关于Spark SQL:如何将新行追加到数据框表(来自另一个表)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!