加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 返回动态数据类型的Apache Spark UDF

发布时间:2020-12-16 18:09:13 所属栏目:安全 来源:网络整理
导读:我有UDF处理 JSON并返回每行的动态数据结果.在我的情况下,我需要这个来验证数据并返回验证数据. 架构对于每一行都是灵活的.这意味着我不能为每个案例创建案例类(我的一些数据可以嵌套). 我试图从我的UDF函数返回元组,但我也没有运气(因为我需要从列表转换为
我有UDF处理 JSON并返回每行的动态数据结果.在我的情况下,我需要这个来验证数据并返回验证数据.

架构对于每一行都是灵活的.这意味着我不能为每个案例创建案例类(我的一些数据可以嵌套).

我试图从我的UDF函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案.

我正在返回的数据类型是String,Integer,Double,DateTime,它们的顺序不同.

我试图在DataFrame上使用map,但是我的架构有问题.

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer,String,DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1,"second",3)
  else
     (1,3,4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0,1),(1,2)).toDF("a","b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

结果

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,3]|
+------------+

我该如何处理这个问题?每个row_type有不同的处理结果.所有row_type都是动态设置的.我可以为每个row_type提供很好的Schema,但我不能使用不同的模式生成相同的UDF返回结果.

使用地图是唯一的方法吗?

解决方法

Spark Dataset是一个柱状数据结构,这里没有灵活架构的地方. Schema必须是同类的(所有行必须具有相同的通用结构)并且已知为upfront(如果使用UDF,则必须返回定义良好的SQL类型).

您可以通过以下方式实现一些灵

>定义表示所有可能字段的超集的模式,并将各列标记为可为空.只有在没有类型冲突的情况下才有可能(如果Row包含字段foo,它总是使用相同的SQL类型表示).
>使用集合类型(MapType,ArrayType)表示可变大小的字段.所有值和/或键必须属于同一类型.
>将原始数据重塑为可通过固定架构实际表示的点. Spark包括作为其依赖项的json4s,它为merging,diffing和querying JSON数据提供了一组工具.如果需要,它可用于应用相对复杂的转换.

如果这不实用,我建议保持JSON字段“按原样”并仅按需解析它以提取特定值.您可以使用get_json_object和显式类型转换.这允许测试不同的场景:

coalesce(Seq("$.bar","$.foo.bar","$.foobar.foo.bar")
  .map(get_json_object($"json_col",_)): _*).cast(DoubleType)

不假设单个文档结构.

你可以通过二进制编码器(Encoders.kryo,Encoders.java)或RDD API获得更多的灵活性,它可以用来存储联合类型(甚至是Any),但是如果你真的期望完全随机的输出,那么它表明一些严重的设计或数据建模问题.即使您可以存储已分析的数据,也很难使用它.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读