scala – 从字符串文字中推断Spark DataType
我正在尝试编写一个
Scala函数,可以根据提供的输入字符串推断出Spark
DataTypes:
/** * Example: * ======== * toSparkType("string") => StringType * toSparkType("boolean") => BooleanType * toSparkType("date") => DateType * etc. */ def toSparkType(inputType : String) : DataType = { var dt : DataType = null if(matchesStringRegex(inputType)) { dt = StringType } else if(matchesBooleanRegex(inputType)) { dt = BooleanType } else if(matchesDateRegex(inputType)) { dt = DateType } else if(...) { ... } dt } 我的目标是支持可用数据类型的大部分(如果不是全部).当我开始实现这个功能时,我开始思考:“Spark / Scala可能已经有了一个helper / util方法,可以为我做这个.”毕竟,我知道我可以这样做: var structType = new StructType() structType.add("some_new_string_col","string",true,Metadata.empty) structType.add("some_new_boolean_col","boolean",Metadata.empty) structType.add("some_new_date_col","date",Metadata.empty) 并且Scala和/或Spark都会隐式地将我的“字符串”参数转换为StringType等.所以我问:我可以使用Spark或Scala来帮助我实现转换器方法有什么魔力? 解决方法
你是对的. Spark已经拥有自己的架构和数据类型推断代码,它用于从底层数据源(csv,json等)推断架构.所以你可以看看它来实现你自己的(实际的实现被标记为Spark的私有,并且是绑定到RDD和内部类,所以它不能直接从Spark之外的代码中使用,但应该让你知道如何去做它.) 鉴于csv是平面类型(并且json可以具有嵌套结构),csv模式推断相对更直接,并且应该帮助您完成上面尝试实现的任务.所以我将解释csv推理是如何工作的(json推理只需要考虑可能的嵌套结构,但数据类型推断非常类似). 有了这个序幕,你要看的东西是CSVInferSchema对象.特别是,查看采用RDD [Array [String]]的推断方法,并在整个RDD中推断数组中每个元素的数据类型.它的方式是 – 它将每个字段标记为NullType开始,然后当它迭代RDD中的下一行值(Array [String])时,如果新的DataType是新的DataType,则它将已经推断的DataType更新为新的DataType更加具体.这发生在here: val rootTypes: Array[DataType] = tokenRdd.aggregate(startType)(inferRowType(options),mergeRowTypes) 现在inferRowType calls inferField为行中的每个字段. inferField implementation是您可能正在寻找的 – 它为特定字段推断到目前为止的类型,并将当前行的字段的字符串值作为参数.然后,它返回现有的推断类型,或者如果推断的新类型更具体,则返回新类型. 代码的相关部分如下: typeSoFar match { case NullType => tryParseInteger(field,options) case IntegerType => tryParseInteger(field,options) case LongType => tryParseLong(field,options) case _: DecimalType => tryParseDecimal(field,options) case DoubleType => tryParseDouble(field,options) case TimestampType => tryParseTimestamp(field,options) case BooleanType => tryParseBoolean(field,options) case StringType => StringType case other: DataType => throw new UnsupportedOperationException(s"Unexpected data type $other") } 请注意,如果typeSoFar是NullType,那么它首先尝试将其解析为Integer,但tryParseInteger调用是对较低类型解析的调用链.因此,如果它无法将值解析为Integer,那么它将调用tryParseLong,失败后将调用tryParseDecimal,失败时将调用tryParseDouble w.o.f.w.i. tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i.最后是stringType. 所以你可以使用几乎相似的逻辑来实现你的用例. (如果您不需要跨行合并,那么您只需逐字实现所有tryParse *方法,只需调用tryParseInteger.无需编写自己的正则表达式.) 希望这可以帮助. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |