scala – 如何为ML算法矢量化DataFrame列?
发布时间:2020-12-16 18:53:04 所属栏目:安全 来源:网络整理
导读:有一个带有一些分类字符串值的DataFrame(例如uuid | url | browser). 我想将它转换为double来执行接受双矩阵的ML算法. 作为转换方法,我使用StringIndexer(spark 1.4)将我的字符串值映射到double值,所以我定义了一个这样的函数: def str(arg: String,df:Data
有一个带有一些分类字符串值的DataFrame(例如uuid | url | browser).
我想将它转换为double来执行接受双矩阵的ML算法. 作为转换方法,我使用StringIndexer(spark 1.4)将我的字符串值映射到double值,所以我定义了一个这样的函数: def str(arg: String,df:DataFrame) : DataFrame = ( val indexer = new StringIndexer().setInputCol(arg).setOutputCol(arg+"_index") val newDF = indexer.fit(df).transform(df) return newDF ) 现在问题是我将迭代df的foreach列,调用此函数并在解析的双列中添加(或转换)原始字符串列,因此结果将是: 初始df: [String: uuid|String: url| String: browser] 最终df: [String: uuid|Double: uuid_index|String: url|Double: url_index|String: browser|Double: Browser_index] 提前致谢 解决方法
您可以简单地在列数组上进行foldLeft:
val transformed: DataFrame = df.columns.foldLeft(df)((df,arg) => str(arg,df)) 不过,我认为这不是一个好方法.由于src丢弃StringIndexerModel,因此在获取新数据时无法使用它.因此,我建议使用 import org.apache.spark.ml.Pipeline val transformers: Array[org.apache.spark.ml.PipelineStage] = df.columns.map( cname => new StringIndexer() .setInputCol(cname) .setOutputCol(s"${cname}_index") ) // Add the rest of your pipeline like VectorAssembler and algorithm val stages: Array[org.apache.spark.ml.PipelineStage] = transformers ++ ??? val pipeline = new Pipeline().setStages(stages) val model = pipeline.fit(df) model.transform(df) VectorAssembler可以包含如下: val assembler = new VectorAssembler() .setInputCols(df.columns.map(cname => s"${cname}_index")) .setOutputCol("features") val stages = transformers :+ assembler 你也可以使用 import org.apache.spark.ml.feature.RFormula val rf = new RFormula().setFormula(" ~ uuid + url + browser - 1") val rfModel = rf.fit(dataset) rfModel.transform(dataset) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |