加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 在Spark 2.0中访问向量列时出现MatchError

发布时间:2020-12-16 10:07:03 所属栏目:安全 来源:网络整理
导读:我正在尝试在 JSON文件上创建LDA模型. 使用JSON文件创建spark上下文: import org.apache.spark.sql.SparkSessionval sparkSession = SparkSession.builder .master("local") .appName("my-spark-app") .config("spark.some.config.option","config-value")
我正在尝试在 JSON文件上创建LDA模型.

使用JSON文件创建spark上下文:

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option","config-value")
  .getOrCreate()

 val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

显示df应显示DataFrame

display(df)

对文本进行标记

import org.apache.spark.ml.feature.RegexTokenizer

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
                .setPattern("[W_]+")
                .setMinTokenLength(4) // Filter away tokens with length < 4
                .setInputCol("text")
                .setOutputCol("tokens")

// Tokenize document
val tokenized_df = tokenizer.transform(df)

这应该显示tokenized_df

display(tokenized_df)

获取停用词

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords

可选:将停用词复制到tmp文件夹

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

收集所有的停用词

val stopwords = sc.textFile("/tmp/stopwords").collect()

过滤掉停用词

import org.apache.spark.ml.feature.StopWordsRemover

 // Set params for StopWordsRemover
 val remover = new StopWordsRemover()
                   .setStopWords(stopwords) // This parameter is optional
                   .setInputCol("tokens")
                   .setOutputCol("filtered")

 // Create new DF with Stopwords removed
 val filtered_df = remover.transform(tokenized_df)

显示已过滤的df应验证已删除的停用词

display(filtered_df)

矢量化词的出现频率

import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.sql.Row
 import org.apache.spark.ml.feature.CountVectorizer

 // Set params for CountVectorizer
 val vectorizer = new CountVectorizer()
               .setInputCol("filtered")
               .setOutputCol("features")
               .fit(filtered_df)

验证矢量化器

vectorizer.transform(filtered_df)
           .select("id","text","features","filtered").show()

在此之后,我看到在LDA中拟合此矢量化器的问题.我认为是CountVectorizer的问题是给出稀疏向量,但LDA需要密集向量.仍在努力找出问题所在.

以下是map无法转换的例外情况.

import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map { 
             case Row(id: String,countVector: Vector) => (id,countVector) 
            }
display(ldaDF)

例外:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times,most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331,10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,3.0,5.0,4.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

LDA有一个工作样本,没有任何问题

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel,LDA}

val a = Vectors.dense(Array(1.0,3.0))
val b = Vectors.dense(Array(3.0,5.0))
val df = Seq((1L,a),(2L,b),a)).toDF

val ldaDF = df.map { case Row(id: Long,countVector) } 

val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)

唯一的区别是在第二个片段中我们有一个密集的矩阵.

解决方法

这与稀疏性无关.由于Spark 2.0.0 ML Transformers不再生成o.a.s.mllib.linalg.VectorUDT,而是o.a.s.ml.linalg.VectorUDT,并且本地映射到o.a.s.ml.linalg.Vector的子类.这些与旧的MLLib API不兼容,后者正在向Spark 2.0.0中的弃用方向发展.

您可以使用Vectors.fromML在“old”之间进行转换:

import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}

OldVectors.fromML(NewVectors.dense(1.0,3.0))
OldVectors.fromML(NewVectors.sparse(5,Seq(0 -> 1.0,2 -> 2.0,4 -> 3.0)))

但如果你已经使用ML变换器,那么使用LDA的ML实现会更有意义.

为方便起见,您可以使用隐式转换:

import scala.languageFeature.implicitConversions

object VectorConversions {
  import org.apache.spark.mllib.{linalg => mllib}
  import org.apache.spark.ml.{linalg => ml}

  implicit def toNewVector(v: mllib.Vector) = v.asML
  implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读