scala – 如何将csv直接加载到Spark数据集中?
发布时间:2020-12-16 18:13:14 所属栏目:安全 来源:网络整理
导读:我有一个csv文件[1],我想直接加载到数据集中.问题是我总是得到错误 org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncateThe type path of the target object is:- field (class: "scala.Float",
我有一个csv文件[1],我想直接加载到数据集中.问题是我总是得到错误
org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate The type path of the target object is: - field (class: "scala.Float",name: "probability") - root class: "TFPredictionFormat" You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object; 而且,特别是对于短语字段(检查案例类[2]),它得到了 org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true); 如果我将我的case类[2]中的所有字段定义为String类型,那么一切正常,但这不是我想要的.有没有一种简单的方法可以做到[3]? 参考 [1]一个示例行 B017NX63A2,Merrell,"['merrell_for_men','merrell_mens_shoes','merrel']",merrell_shoes,0.0806054356579781 [2]我的代码片段如下 import spark.implicits._ val INPUT_TF = "<SOME_URI>/my_file.csv" final case class TFFormat ( doc_id: String,brand: String,phrases: Seq[String],prediction: String,probability: Float ) val ds = sqlContext.read .option("header","true") .option("charset","UTF8") .csv(INPUT_TF) .as[TFFormat] ds.take(1).map(println) [3]我已经找到了方法,首先在DataFrame级别定义列并将事物转换为数据集(如here或here或here),但我几乎可以肯定这不是应该完成的事情.我也很确定编码器可能是答案,但我不知道如何 解决方法
TL; DR使用标准DataFrame操作进行csv输入转换是可行的方法.如果你想避免你应该使用具有表现力的输入格式(Parquet甚至是JSON).
通常,要转换为静态类型数据集的数据必须已经是正确的类型.最有效的方法是为csv reader提供schema参数: val schema: StructType = ??? val ds = spark.read .option("header","true") .schema(schema) .csv(path) .as[T] 可以通过反射推断出模式: import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.types.StructType val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] 不幸的是,它不适用于您的数据和类,因为csv reader不支持ArrayType(但它适用于像FloatType这样的原子类型),因此您必须使用困难的方法.一个天真的解决方案可以表达如下: import org.apache.spark.sql.functions._ val df: DataFrame = ??? // Raw data df .withColumn("probability",$"probability".cast("float")) .withColumn("phrases",split(regexp_replace($"phrases","[[']]",""),",")) .as[TFFormat] 但根据短语的内容,你可能需要更复杂的东西. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |