scala – 如何在Spark ML中创建正确的数据框架进行分类
我试图通过使用
Spark ML api来运行随机森林分类,但是我正在创建正确的数据帧输入到管道中。
以下是样本数据: age,hours_per_week,education,sex,salaryRange 38,40,"hs-grad","male","A" 28,"bachelors","female","A" 52,45,"B" 31,50,"masters","B" 42,"B" age和hours_per_week是整数,而包括标签salaryRange的其他功能是分类(String) 加载这个csv文件(可以称之为sample.csv)可以通过Spark csv library完成,如下所示: val data = sqlContext.csvFile("/home/dusan/sample.csv") 默认情况下,所有列都以字符串形式导入,因此我们需要将“age”和“hours_per_week”更改为Int: val toInt = udf[Int,String]( _.toInt) val dataFixed = data.withColumn("age",toInt(data("age"))).withColumn("hours_per_week",toInt(data("hours_per_week"))) 只是为了检查模式如何看: scala> dataFixed.printSchema root |-- age: integer (nullable = true) |-- hours_per_week: integer (nullable = true) |-- education: string (nullable = true) |-- sex: string (nullable = true) |-- salaryRange: string (nullable = true) 然后设置交叉验证器和管道: val rf = new RandomForestClassifier() val pipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) 运行此行时出现错误: val cmModel = cv.fit(dataFixed) java.lang.IllegalArgumentException:字段“features”不存在。 可以在RandomForestClassifier中设置标签列和特征列,但是我有4列作为预测变量(特征)不仅仅是一个。 如何组织数据框架,使其标签和功能列组织正确? 为了您的方便,这里是完整的代码: import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.tuning.CrossValidator import org.apache.spark.ml.Pipeline import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions._ import org.apache.spark.mllib.linalg.{Vector,Vectors} object SampleClassification { def main(args: Array[String]): Unit = { //set spark context val conf = new SparkConf().setAppName("Simple Application").setMaster("local"); val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ import com.databricks.spark.csv._ //load data by using databricks "Spark CSV Library" val data = sqlContext.csvFile("/home/dusan/sample.csv") //by default all columns are imported as string so we need to change "age" and "hours_per_week" to Int val toInt = udf[Int,String]( _.toInt) val dataFixed = data.withColumn("age",toInt(data("hours_per_week"))) val rf = new RandomForestClassifier() val pipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator().setNumFolds(10).setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) // this fails with error //java.lang.IllegalArgumentException: Field "features" does not exist. val cmModel = cv.fit(dataFixed) } } 感谢帮助! 解决方法
您只需要确保您的数据框中有一个“功能”列,其类型为VectorUDF,如下所示:
scala> val df2 = dataFixed.withColumnRenamed("age","features") df2: org.apache.spark.sql.DataFrame = [features: int,hours_per_week: int,education: string,sex: string,salaryRange: string] scala> val cmModel = cv.fit(df2) java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.mllib.linalg.VectorUDT@1eef but was actually IntegerType. at scala.Predef$.require(Predef.scala:233) at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37) at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50) at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71) at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:118) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164) at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:164) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108) at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:164) at org.apache.spark.ml.tuning.CrossValidator.transformSchema(CrossValidator.scala:142) at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59) at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:107) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76) 编辑1 本质上,您的数据框中需要两个字段,用于特征向量和“标签”,例如标签。实例必须是Double类型。 要使用Vector类型创建“features”字段,首先创建一个udf,如下所示: val toVec4 = udf[Vector,Int,String,String] { (a,b,c,d) => val e3 = c match { case "hs-grad" => 0 case "bachelors" => 1 case "masters" => 2 } val e4 = d match {case "male" => 0 case "female" => 1} Vectors.dense(a,e3,e4) } 现在还要对“label”字段进行编码,创建另一个udf,如下所示: val encodeLabel = udf[Double,String]( _ match { case "A" => 0.0 case "B" => 1.0} ) 现在我们使用这两个udf来转换原始数据框: val df = dataFixed.withColumn( "features",toVec4( dataFixed("age"),dataFixed("hours_per_week"),dataFixed("education"),dataFixed("sex") ) ).withColumn("label",encodeLabel(dataFixed("salaryRange"))).select("features","label") 请注意,数据帧中可能存在额外的列/字段,但在这种情况下,我只选择了功能和标签: scala> df.show() +-------------------+-----+ | features|label| +-------------------+-----+ |[38.0,40.0,0.0,0.0]| 0.0| |[28.0,1.0,1.0]| 0.0| |[52.0,45.0,0.0]| 1.0| |[31.0,50.0,2.0,1.0]| 1.0| |[42.0,0.0]| 1.0| +-------------------+-----+ 现在,您可以为您的学习算法设置正确的参数,使其正常工作。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |