scala – 优化Spark作业,必须为每个条目相似度计算每个条目,并为
我有一个需要计算基于电影内容的相似性的Spark工作.有46k电影.每部电影由一组SparseVectors表示(每个矢量是电影场之一的特征向量,例如Title,Plot,Genres,Actors等).例如,对于Actors和Genres,向量显示给定的actor是否在电影中存在(1)或不存在(0).
任务是为每部电影找到前10个类似的电影. 我进行此计算的方法是在电影数据集上使用交叉连接.然后通过仅获取movie1_id<的行来减少问题. movie2_id. 然后我计算每一行的相似度得分.在计算相似度之后,我将movie1_id相同的结果分组,并使用取得前N个项目的Window函数(相似于此处描述的方式:Spark get top N highest score results for each (item1,item2,score))通过相似性得分按降序对它们进行排序. 问题是 – 它可以在Spark中更有效地完成吗?例如.无需执行crossJoin? 还有一个问题–Spark如何处理如此庞大的数据帧(1058000000行由多个SparseVectors组成)?是否必须一次将所有这些保留在内存中?或者它是否以某种方式逐个处理这样的数据帧? 我正在使用以下函数来计算电影矢量之间的相似性: def intersectionCosine(movie1Vec: SparseVector,movie2Vec: SparseVector): Double = { val a: BSV[Double] = toBreeze(movie1Vec) val b: BSV[Double] = toBreeze(movie2Vec) var dot: Double = 0 var offset: Int = 0 while( offset < a.activeSize) { val index: Int = a.indexAt(offset) val value: Double = a.valueAt(offset) dot += value * b(index) offset += 1 } val bReduced: BSV[Double] = new BSV(a.index,a.index.map(i => b(i)),a.index.length) val maga: Double = magnitude(a) val magb: Double = magnitude(bReduced) if (maga == 0 || magb == 0) return 0 else return dot / (maga * magb) } Dataframe中的每一行都包含两个连接的类: final case class MovieVecData(imdbID: Int,Title: SparseVector,Decade: SparseVector,Plot: SparseVector,Genres: SparseVector,Actors: SparseVector,Countries: SparseVector,Writers: SparseVector,Directors: SparseVector,Productions: SparseVector,Rating: Double ) 解决方法
只要你对近似值很好,并且不需要精确的结果(或确切的数字或结果),它就可以更有效地完成.
与我对Efficient string matching in Apache Spark的回答类似,您可以使用LSH,其中: > 如果要素空间很小(或可以合理地减少)并且每个类别相对较小,您还可以手动优化代码: >爆炸特征数组从单个记录生成#features记录. 一个最小的例子是(认为它是一个伪代码): import org.apache.spark.ml.linalg._ // This is oversimplified. In practice don't assume only sparse scenario val indices = udf((v: SparseVector) => v.indices) val df = Seq( (1L,Vectors.sparse(1024,Array(1,3,5),Array(1.0,1.0,1.0))),(2L,Array(3,8,12),(3L,(4L,Array(11,21),(5L,Array(21,32),1.0))) ).toDF("id","features") val possibleMatches = df .withColumn("key",explode(indices($"features"))) .transform(df => df.alias("left").join(df.alias("right"),Seq("key"))) val closeEnough(threshold: Double) = udf((v1: SparseVector,v2: SparseVector) => intersectionCosine(v1,v2) > threshold) possilbeMatches.filter(closeEnough($"left.features",$"right.features")).select($"left.id",$"right.id").distinct 请注意,只有当散列/特征具有足够的选择性(并且最佳稀疏)时,这两种解决方案才值得开销.在上面显示的示例中,您只比较集合{1,2,3}和{4,5}中的行,而不是集合之间的行. 然而,在最坏的情况下(M记录,N个特征),我们可以进行N M2比较,而不是M2 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |