我正在尝试将功能应用于Spark DataFrame的每一行,如示例所示。

val df = sc.parallelize(
  Seq((1, 2, 0), (0, 0, 1), (0, 0, 0))).toDF("x", "y", "z")
df.show()

产生
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  1|  2|  0|
|  0|  0|  1|
|  0|  0|  0|
+---+---+---+

假设我想对每一行中的值做一些事情,例如将0更改为5。
val b = df.map(row => row.toSeq.map(x => x match{
    case 0 => 5
    case x: Int => x
}))

b.show()
+---------+
|    value|
+---------+
|[1, 2, 5]|
|[5, 5, 1]|
|[5, 5, 5]|
+---------+

该功能有效,但是我现在只有一列,其条目是列表,而不是3列的Ints。我想要我的命名列。

最佳答案

这里有多种方法可以实现:

df.map(row => {
      val size = row.size
      var seq: Seq[Int] = Seq.empty[Int]
      for (a <- 0 to size - 1) {
        val value: Int = row(a).asInstanceOf[Int]
        val newVal: Int = value match {
          case 0 =>
            5
          case _ =>
            value
        }
        seq = seq :+ newVal
      }
      Row.fromSeq(seq)
    })(RowEncoder.apply(df.schema))
 val columns = df.columns
    df.select(
        columns.map(c => when(col(c) === 0, 5).otherwise(col(c)).as(c)): _*)
      .show()
def fun: (Int => Int) = { x =>
      if (x == 0) 5 else x
    }
    val function = udf(fun)
    df.select(function(col("x")).as("x"),
              function(col("y")).as("y"),
              function(col("z")).as("z"))
      .show()
def checkZero(a: Int): Int = if (a == 0) 5 else a

      df.map {
        case Row(a: Int, b: Int, c: Int) =>
          Row(checkZero(a), checkZero(b), checkZero(c))
      } { RowEncoder.apply(df.schema) }
      .show()

09-28 14:22