加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何将RDD中的每个元素与RDD中的每个其他元素进行比较

发布时间:2020-12-16 09:54:16 所属栏目:安全 来源:网络整理
导读:我正在尝试使用spark执行K最近邻搜索. 我有一个RDD [Seq [Double]],我正计划返回一个 RDD [(序号[双],SEQ [序号[双]])] 与实际行和邻居列表 val out = data.map(row = { val neighbours = data.top(num = 3)(new Ordering[Seq[Double]] { override def compa
我正在尝试使用spark执行K最近邻搜索.

我有一个RDD [Seq [Double]],我正计划返回一个
RDD [(序号[双],SEQ [序号[双]])]
与实际行和邻居列表

val out = data.map(row => {
    val neighbours = data.top(num = 3)(new Ordering[Seq[Double]] {
      override def compare(a:Seq[Double],b:Seq[Double]) = {
        euclideanDistance(a,row).compare(euclideanDistance(b,row))*(-1)
      }
    })
  (row,neighbours.toSeq)
})

并且它在spark提交时给出以下错误

15/04/29 21:15:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2,192.168.1.7): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver,not inside of other transformations; for example,rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information,see SPARK-5063.

我知道嵌套RDD是不可能的,但我如何执行这样的操作,我可以将RDD中的每个元素与RDD中的每个其他元素进行比较

解决方法

这样的事情应该做到这一点.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sco= new SparkContext(conf)

// k is the number of nearest neighbors required 
val k = 3

// generate 5 rows of two-dimensional coordinates
val rows = List.fill(5)(List.fill(2)(Math.random))
val dataRDD = sco.parallelize(rows,1)

// No need for the sqrt as we're just comparing them
def euclidean(a:List[Double],b:List[Double]) = 
 (a zip b) map {case (x:Double,y:Double) => (x-y)*(x-y)} sum

// get all pairs
val pairs = dataRDD.cartesian(dataRDD)

// case class to keep things a bit neater
// the neighbor,and its distance from the current point
case class Entry(neighbor: List[Double],dist:Double)

// map the second element to the element and distance from the first
val pairsWithDist = pairs.map {case (x,y) => (x,Entry(y,euclidean(x,y)))}

// merge a row of pairsWithDist with the ResultRow for this point
def mergeOne(u: List[Entry],v:Entry) = (v::u).sortBy{_.dist}.take(k)

// merge two results from different partitions
def mergeList(u: List[Entry],v:List[Entry]) = (u:::v).sortBy{_.dist}.take(k)

val nearestNeighbors = pairsWithDist
                      .aggregateByKey(List[Entry]())(mergeOne,mergeList)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读