当我尝试访问scala.MatchError (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
行元素时,会发生DataFrame
异常。下面的代码计算书籍对的数量,其中书籍对的数量等于阅读这对书籍的读者的数量。
有趣的是,仅当trainPairs
创建trainDf.join(...)
时,才会发生异常。内联创建相同的数据结构时:
case class BookPair (book1:Int, book2:Int, cnt:Int, name1: String, name2: String)
val recs = Array(
BookPair(1, 2, 3, "book1", "book2"),
BookPair(2, 3, 1, "book2", "book3"),
BookPair(1, 3, 2, "book1", "book3"),
BookPair(1, 4, 5, "book1", "book4"),
BookPair(2, 4, 7, "book2", "book4")
)
根本不会发生此异常!
产生此异常的完整代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.functions._
object Scratch {
case class Book(book: Int, reader: Int, name:String)
val recs = Array(
Book(book = 1, reader = 30, name = "book1"),
Book(book = 2, reader = 10, name = "book2"),
Book(book = 3, reader = 20, name = "book3"),
Book(book = 1, reader = 20, name = "book1"),
Book(book = 1, reader = 10, name = "book1"),
Book(book = 1, reader = 40, name = "book1"),
Book(book = 2, reader = 40, name = "book2"),
Book(book = 1, reader = 100, name = "book1"),
Book(book = 2, reader = 100, name = "book2"),
Book(book = 3, reader = 100, name = "book3"),
Book(book = 4, reader = 100, name = "book4"),
Book(book = 5, reader = 100, name = "book5"),
Book(book = 4, reader = 500, name = "book4"),
Book(book = 1, reader = 510, name = "book1"),
Book(book = 2, reader = 30, name = "book2"))
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val data = sc.parallelize(recs)
/**
* Remove readers with many books
count books by reader
and filter readers with books count > 10
*/
val maxBookCnt = 4
val readersWithLotsOfBooksRDD = data.map(r => (r.reader, 1)).reduceByKey((x, y) => x + y).filter{ case (_, x) => x > maxBookCnt }
readersWithLotsOfBooksRDD.collect()
val readersWithBooksRDD = data.map( r => (r.reader, (r.book, r.name) ))
readersWithBooksRDD.collect()
println("*** Records left after removing readers with maxBookCnt > "+maxBookCnt)
val data2 = readersWithBooksRDD.subtractByKey(readersWithLotsOfBooksRDD)
data2.foreach(println)
// *** Prepair train data
val trainData = data2.map(tuple => tuple match {
case (reader,v) => Book(reader = reader, book = v._1, name = v._2)
})
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val trainDf = trainData.toDF()
println("*** Creating pairs...")
val trainPairs = trainDf.join(
trainDf.select($"book" as "r_book", $"reader" as "r_reader", $"name" as "r_name"),
$"reader" === $"r_reader" and $"book" < $"r_book")
.groupBy($"book", $"r_book", $"name", $"r_name")
.agg($"book",$"r_book", count($"reader") as "cnt", $"name", $"r_name")
trainPairs.registerTempTable("trainPairs")
println("*** Pairs Schema:")
trainPairs.printSchema()
// Order pairs by count
val pairsSorted = sqlContext.sql("SELECT * FROM trainPairs ORDER BY cnt DESC")
println("*** Pairs Sorted by Count")
pairsSorted.show
// Key pairs by book
val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Int, name1: String, name2:String)
=> (book1,(book2, count, name1, name2))})
println("*** keyedPairs:")
keyedPairs.foreach(println)
}
}
有任何想法吗?
更新
zero323写道:
“这会引发异常,因为trainPairs的架构与您提供的模式不匹配。架构如下所示:
root
|-- book: integer (nullable = false)
|-- r_book: integer (nullable = false)
|-- name: string (nullable = true)
|-- r_name: string (nullable = true)
|-- book: integer (nullable = false)
|-- r_book: integer (nullable = false)
|-- cnt: long (nullable = false)
|-- name: string (nullable = true)
|-- r_name: string (nullable = true)
好的,但是如何找到完整的
trainPairs
模式?为什么然后当我使用命令打印trainPairs
模式时:trainPairs.printSchema()
我仅得到此模式的一部分:
root
|-- book: integer (nullable = false)
|-- r_book: integer (nullable = false)
|-- cnt: long (nullable = false)
|-- name: string (nullable = true)
|-- r_name: string (nullable = true)
如何打印/查找完整的
trainPairs
模式?除了
Row(Int, Int, String, String, Int, Int, Long, String, String)
结果在相同的
scala.MatchError
中! 最佳答案
如我所见,异常发作是由count
行字段的类型错误引起的。它应该是Long
而不是Int
。因此,而不是:
// Key pairs by book
val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Int, name1: String, name2:String)
=> (book1,(book2, count, name1, name2))})
正确的代码应为:
val keyedPairs = trainPairs.rdd.map({case Row(book1: Int, book2: Int, count: Long, name1: String, name2:String)
=> (book1,(book2, count, name1, name2))})
一切都会按预期进行。