scala – 如何编写简单地进行行收集的Spark UDAF?
发布时间:2020-12-16 18:09:22 所属栏目:安全 来源:网络整理
导读:根据我的具体要求,我想编写一个UDAF,它只收集所有输入行. 输入是两列行,Double Type; 中间模式,“我想”,是ArrayList(如果我错了,请纠正我) 返回的数据类型是ArrayList 我写了一篇关于我的UDAF的“想法”,但我希望有人帮助我完成它. class CollectorUDAF() e
根据我的具体要求,我想编写一个UDAF,它只收集所有输入行.
输入是两列行,Double Type; 中间模式,“我想”,是ArrayList(如果我错了,请纠正我) 返回的数据类型是ArrayList 我写了一篇关于我的UDAF的“想法”,但我希望有人帮助我完成它. class CollectorUDAF() extends UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = StructType(Array(StructField("value",DoubleType),StructField("y",DoubleType))) // Intermediate Schema def bufferSchema = util.ArrayList[Array(StructField("value",DoubleType)] // Returned Data Type . def dataType: DataType = util.ArrayList[Array(StructField("value",DoubleType)] // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer,input: Row) = { } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { } def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = { } } 解决方法
如果我理解你的问题是正确的,那么以下是你的解决方案:
class CollectorUDAF() extends UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = new StructType().add("value",DataTypes.DoubleType).add("y",DataTypes.DoubleType) // Intermediate Schema val bufferFields : util.ArrayList[StructField] = new util.ArrayList[StructField] val bufferStructField : StructField = DataTypes.createStructField("array",DataTypes.createArrayType(DataTypes.StringType,true),true) bufferFields.add(bufferStructField) def bufferSchema: StructType = DataTypes.createStructType(bufferFields) // Returned Data Type . def dataType: DataType = DataTypes.createArrayType(DataTypes.DoubleType) // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { buffer(0,new java.util.ArrayList[Double]) } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer,input: Row) = { val DoubleList: util.ArrayList[Double] = new util.ArrayList[Double](buffer.getList(0)) DoubleList.add(input.getDouble(0)) DoubleList.add(input.getDouble(1)) buffer.update(0,DoubleList) } def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = { buffer1.update(0,buffer1.getList(0).toArray() ++ buffer2.getList(0).toArray()) } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { buffer.getList(0).toArray() } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |