问题描述
我正在尝试在 flink 中执行 pagerank Basic 示例,并稍作修改(仅在读取输入文件时,其他一切都相同)我收到错误为 Task not serializable 和下面是输出错误的部分
I am trying to do the pagerank Basic example in flink with little bit of modification(only in reading the input file, everything else is the same) i am getting the error as Task not serializable and below is the part of the output error
atorg.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179)在 org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:171)
下面是我的代码
object hpdb {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val maxIterations = 10000
val DAMPENING_FACTOR: Double = 0.85
val EPSILON: Double = 0.0001
val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"
val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target
val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid
val noOfPages = pages.count()
val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))
val adjacencyLists = links
// initialize lists ._1 is the source id and ._2 is the traget id
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
// **//the output shows error here**
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
//**//the output shows error here**
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count()))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outpath, "\n", " ")
env.execute()
}
}
对正确方向的任何帮助表示高度赞赏?谢谢.
Any help in the right direction is highly appreciated? Thank you.
推荐答案
问题是您从 MapFunction
中引用了 DataSet
pages
>.这是不可能的,因为 DataSet
只是数据流的逻辑表示,不能在运行时访问.
The problem is that you reference the DataSet
pages
from within a MapFunction
. This is not possible, since a DataSet
is only the logical representation of a data flow and cannot be accessed at runtime.
解决这个问题你需要做的是将 val pagesCount = pages.count
值赋给一个变量 pagesCount
并在你的 中引用这个变量>MapFunction
.
What you have to do to solve this problem is to assign the val pagesCount = pages.count
value to a variable pagesCount
and refer to this variable in your MapFunction
.
pages.count
实际做的,就是触发数据流图的执行,从而统计pages
中的元素个数.然后将结果返回给您的程序.
What pages.count
actually does, is to trigger the execution of the data flow graph, so that the number of elements in pages
can be counted. The result is then returned to your program.
这篇关于任务不可序列化 Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!