加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark 2.0 DataSets groupByKey并划分操作和类型安全

发布时间:2020-12-16 18:23:17 所属栏目:安全 来源:网络整理
导读:我非常满意Spark 2.0 DataSet,因为它的编译时类型安全.但是这里有几个我无法解决的问题,我也没有找到好的文档. 问题#1 – 在聚合列上划分操作 – 考虑下面的代码 – 我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/ 8上进行groupByKey.如果我只是计
我非常满意Spark 2.0 DataSet,因为它的编译时类型安全.但是这里有几个我无法解决的问题,我也没有找到好的文档.

问题#1 – 在聚合列上划分操作 –
考虑下面的代码 –
我有一个DataSet [MyCaseClass],我想在c1,c2,c3和sum(c4)/ 8上进行groupByKey.如果我只是计算总和但它给了divide(8)的编译时错误,下面的代码效果很好.我想知道如何实现以下目标.

final case class MyClass (c1: String,c2: String,c3: String,c4: Double)

    val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded

    import sparkSession.implicits._
    import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}

     myCaseClass.
       groupByKey(myCaseClass =>
          (myCaseClass.c1,myCaseClass.c2,myCaseClass.c3)).
          agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
          divide(8)). //this is breaking with exception
       show()

如果我删除.divide(8)操作并运行上面的命令它会给我低于输出.

+-----------+-------------+
|        key|sum(c4)      |
+-----------+-------------+
| [A1,F2,S1]|         80.0|
| [A1,F1,S1]|         40.0|  
+-----------+-------------+

问题#2 – 将groupedByKey结果转换为另一个Typed DataFrame –
现在问题的第二部分是我想再次输出一个类型化的DataSet.为此,我有另一个案例类(不确定是否需要),但我不知道如何映射分组结果 –

final case class AnotherClass(c1: String,average: Double) 

 myCaseClass.
           groupByKey(myCaseClass =>
              (myCaseClass.c1,myCaseClass.c3)).
              agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception

但是这又失败了,因为按键分组的结果并没有直接映射到AnotherClass.

PS:上述任何其他解决方案都非常受欢迎.

解决方法

第一个问题可以通过一直使用类型列来解决(KeyValueGroupedDataset.agg需要TypedColumn(-s))
?您可以将聚合结果定义为:

val eight = lit(8.0)
  .as[Double]  // Not necessary

val sumByEight = typedSum[MyClass](_.c4)
  .divide(eight)
  .as[Double]  // Required
  .name("div(sum(c4),8)")

并将其插入以下代码:

val myCaseClass = Seq(
  MyClass("a","b","c",2.0),MyClass("a",3.0)
).toDS

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3))
  .agg(sumByEight)

要得到

+-------+---------------+
|    key|div(sum(c4),8)|
+-------+---------------+
|[a,b,c]|          0.625|
+-------+---------------+

第二个问题是使用不符合数据形状的类的结果.正确的表示可能是:

case class AnotherClass(key: (String,String,String),sum: Double)

与上面定义的数据一起使用:

myCaseClass
   .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3))
   .agg(typedSum[MyClass](_.c4).name("sum"))
   .as[AnotherClass]

会给:

+-------+---+
|    key|sum|
+-------+---+
|[a,c]|5.0|
+-------+---+

但是如果数据集[((String,Double)]可以接受,那么[AnotherClass]就没有必要了.

您当然可以跳过所有这些并且只是mapGroups(尽管不会没有性能损失):

import shapeless.syntax.std.tuple._   // A little bit of shapeless

val tuples = myCaseClass
 .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3))
 .mapGroups((group,iter) => group :+ iter.map(_.c4).sum)

结果

+---+---+---+---+   
| _1| _2| _3| _4|
+---+---+---+---+
|  a|  b|  c|5.0|
+---+---+---+---+

reduceGroups可能是更好的选择:

myCaseClass
  .groupByKey(myCaseClass => (myCaseClass.c1,myCaseClass.c3))
  .reduceGroups((x,y) => x.copy(c4=x.c4 + y.c4))

结果数据集:

+-------+-----------+    
|     _1|         _2|
+-------+-----------+
|[a,c]|[a,c,5.0]|
+-------+-----------+

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读