scala – 任务不可序列化Flink
发布时间:2020-12-16 18:20:58 所属栏目:安全 来源:网络整理
导读:我试图在flink中进行pagerank基本示例,稍加修改(仅在读取输入文件时,其他一切都是相同的)我得到错误,因为任务不可序列化,下面是输出错误的一部分 atorg.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:179) at org.apache
我试图在flink中进行pagerank基本示例,稍加修改(仅在读取输入文件时,其他一切都是相同的)我得到错误,因为任务不可序列化,下面是输出错误的一部分
以下是我的代码 object hpdb { def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val maxIterations = 10000 val DAMPENING_FACTOR: Double = 0.85 val EPSILON: Double = 0.0001 val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv" val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",fieldDelimiter = "t",includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid val noOfPages = pages.count() val pagesWithRanks = pages.map(p => Page(p.pageId,1.0 / noOfPages)) val adjacencyLists = links // initialize lists ._1 is the source id and ._2 is the traget id .map(e => AdjacencyList(e.sourceId,Array(e.targetId))) // concatenate lists .groupBy("sourceId").reduce { (l1,l2) => AdjacencyList(l1.sourceId,l1.targetIds ++ l2.targetIds) } // start iteration val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { // **//the output shows error here** currentRanks => val newRanks = currentRanks // distribute ranks to target pages .join(adjacencyLists).where("pageId").equalTo("sourceId") { (page,adjacent,out: Collector[Page]) => for (targetId <- adjacent.targetIds) { out.collect(Page(targetId,page.rank / adjacent.targetIds.length)) } } // collect ranks and sum them up .groupBy("pageId").aggregate(SUM,"rank") // apply dampening factor //**//the output shows error here** .map { p => Page(p.pageId,(p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / pages.count())) } // terminate if no rank update was significant val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { (current,next,out: Collector[Int]) => // check for significant update if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) } (newRanks,termination) } val result = finalRanks // emit result result.writeAsCsv(outpath,"n"," ") env.execute() } } 任何正确方向的帮助都受到高度赞赏?谢谢. 解决方法
问题是您从MapFunction中引用DataSet页面.这是不可能的,因为DataSet只是数据流的逻辑表示,不能在运行时访问.
要解决此问题,您需要做的是将val pagesCount = pages.count值赋给变量pagesCount并在MapFunction中引用此变量. pages.count实际上做的是触发数据流图的执行,以便可以计算页中元素的数量.然后结果返回到您的程序. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |