加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 优化Spark作业,必须为每个条目相似度计算每个条目,并为

发布时间:2020-12-16 18:58:52 所属栏目:安全 来源:网络整理
导读:我有一个需要计算基于电影内容的相似性的Spark工作.有46k电影.每部电影由一组SparseVectors表示(每个矢量是电影场之一的特征向量,例如Title,Plot,Genres,Actors等).例如,对于Actors和Genres,向量显示给定的actor是否在电影中存在(1)或不存在(0). 任务是为每
我有一个需要计算基于电影内容的相似性的Spark工作.有46k电影.每部电影由一组SparseVectors表示(每个矢量是电影场之一的特征向量,例如Title,Plot,Genres,Actors等).例如,对于Actors和Genres,向量显示给定的actor是否在电影中存在(1)或不存在(0).

任务是为每部电影找到前10个类似的电影.
我设法在Scala中编写一个脚本来执行所有这些计算并完成工作.它适用于较小的电影集,如1000部电影,但不适用于整个数据集(内存不足等).

我进行此计算的方法是在电影数据集上使用交叉连接.然后通过仅获取movie1_id<的行来减少问题. movie2_id.
此时数据集仍然包含46000 ^ 2/2行,即1058000000.
每行都有大量数据.

然后我计算每一行的相似度得分.在计算相似度之后,我将movie1_id相同的结果分组,并使用取得前N个项目的Window函数(相似于此处描述的方式:Spark get top N highest score results for each (item1,item2,score))通过相似性得分按降序对它们进行排序.

问题是 – 它可以在Spark中更有效地完成吗?例如.无需执行crossJoin?

还有一个问题–Spark如何处理如此庞大的数据帧(1058000000行由多个SparseVectors组成)?是否必须一次将所有这些保留在内存中?或者它是否以某种方式逐个处理这样的数据帧?

我正在使用以下函数来计算电影矢量之间的相似性:

def intersectionCosine(movie1Vec: SparseVector,movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
  val index: Int = a.indexAt(offset)
  val value: Double = a.valueAt(offset)

  dot += value * b(index)
  offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index,a.index.map(i => b(i)),a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
  return 0
else
  return dot / (maga * magb)
}

Dataframe中的每一行都包含两个连接的类:

final case class MovieVecData(imdbID: Int,Title: SparseVector,Decade: SparseVector,Plot: SparseVector,Genres: SparseVector,Actors: SparseVector,Countries: SparseVector,Writers: SparseVector,Directors: SparseVector,Productions: SparseVector,Rating: Double
                         )

解决方法

只要你对近似值很好,并且不需要精确的结果(或确切的数字或结果),它就可以更有效地完成.

与我对Efficient string matching in Apache Spark的回答类似,您可以使用LSH,其中:

> BucketedRandomProjectionLSH接近欧几里德距离.
> MinHashLSH近似Jaccard距离.

如果要素空间很小(或可以合理地减少)并且每个类别相对较小,您还可以手动优化代码:

>爆炸特征数组从单个记录生成#features记录.
>按功能自我加入结果,计算距离并过滤掉候选人(当且仅当他们共享特定的分类特征时,才会比较每对记录).
>使用当前代码获取最佳记录.

一个最小的例子是(认为它是一个伪代码):

import org.apache.spark.ml.linalg._

// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)

val df = Seq(
  (1L,Vectors.sparse(1024,Array(1,3,5),Array(1.0,1.0,1.0))),(2L,Array(3,8,12),(3L,(4L,Array(11,21),(5L,Array(21,32),1.0)))
).toDF("id","features")

val possibleMatches = df
  .withColumn("key",explode(indices($"features")))
  .transform(df => df.alias("left").join(df.alias("right"),Seq("key")))

val closeEnough(threshold: Double) = udf((v1: SparseVector,v2: SparseVector) =>  intersectionCosine(v1,v2) > threshold)

possilbeMatches.filter(closeEnough($"left.features",$"right.features")).select($"left.id",$"right.id").distinct

请注意,只有当散列/特征具有足够的选择性(并且最佳稀疏)时,这两种解决方案才值得开销.在上面显示的示例中,您只比较集合{1,2,3}和{4,5}中的行,而不是集合之间的行.

然而,在最坏的情况下(M记录,N个特征),我们可以进行N M2比较,而不是M2

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读