scala – 返回动态数据类型的Apache Spark UDF
我有UDF处理
JSON并返回每行的动态数据结果.在我的情况下,我需要这个来验证数据并返回验证数据.
架构对于每一行都是灵活的.这意味着我不能为每个案例创建案例类(我的一些数据可以嵌套). 我试图从我的UDF函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案. 我正在返回的数据类型是String,Integer,Double,DateTime,它们的顺序不同. 我试图在DataFrame上使用map,但是我的架构有问题. import spark.implicits._ def processData(row_type: String) = { /* completely random output here. Tuple/List/Array of elements with a type Integer,String,DateType. */ // pseudo-code starts here if row_type == A (1,"second",3) else (1,3,4) } val processDataUDF = udf((row_type: String) => processData(row_type)) val df = Seq((0,1),(1,2)).toDF("a","b") val df2 = df.select(processDataUDF($"a")) df2.show(5) df2.printSchema() 结果 +------------+ | UDF(a)| +------------+ |[1,second,3]| |[1,3]| +------------+ 我该如何处理这个问题?每个row_type有不同的处理结果.所有row_type都是动态设置的.我可以为每个row_type提供很好的Schema,但我不能使用不同的模式生成相同的UDF返回结果. 使用地图是唯一的方法吗? 解决方法
Spark Dataset是一个柱状数据结构,这里没有灵活架构的地方. Schema必须是同类的(所有行必须具有相同的通用结构)并且已知为upfront(如果使用UDF,则必须返回定义良好的SQL类型).
您可以通过以下方式实现一些灵 >定义表示所有可能字段的超集的模式,并将各列标记为可为空.只有在没有类型冲突的情况下才有可能(如果Row包含字段foo,它总是使用相同的SQL类型表示). 如果这不实用,我建议保持JSON字段“按原样”并仅按需解析它以提取特定值.您可以使用get_json_object和显式类型转换.这允许测试不同的场景: coalesce(Seq("$.bar","$.foo.bar","$.foobar.foo.bar") .map(get_json_object($"json_col",_)): _*).cast(DoubleType) 不假设单个文档结构. 你可以通过二进制编码器(Encoders.kryo,Encoders.java)或RDD API获得更多的灵活性,它可以用来存储联合类型(甚至是Any),但是如果你真的期望完全随机的输出,那么它表明一些严重的设计或数据建模问题.即使您可以存储已分析的数据,也很难使用它. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |