在Dask中使用GroupBy的自定义聚合函数构造模式和相应的计数函数
发布时间:2020-12-14 05:02:22 所属栏目:百科 来源:网络整理
导读:所以dask现在已经更新,以支持groupby的自定义聚合功能. (感谢开发团队和@chmp的工作!).我目前正在尝试构建一个模式函数和相应的计数函数.基本上我所设想的是,模式为每个分组返回特定??列的最常见值的列表(即[4,1,2]).此外,还有一个相应的计数函数,它返回这
所以dask现在已经更新,以支持groupby的自定义聚合功能. (感谢开发团队和@chmp的工作!).我目前正在尝试构建一个模式函数和相应的计数函数.基本上我所设想的是,模式为每个分组返回特定??列的最常见值的列表(即[4,1,2]).此外,还有一个相应的计数函数,它返回这些值的实例数,即. 3.
现在我正在尝试在代码中实现它.根据groupby.py文件,自定义聚合的参数如下: Parameters ---------- name : str the name of the aggregation. It should be unique,since intermediate result will be identified by this name. chunk : callable a function that will be called with the grouped column of each partition. It can either return a single series or a tuple of series. The index has to be equal to the groups. agg : callable a function that will be called to aggregate the results of each chunk. Again the argument(s) will be grouped series. If ``chunk`` returned a tuple,``agg`` will be called with all of them as individual positional arguments. finalize : callable an optional finalizer that will be called with the results from the aggregation. 以下是提供的代码: custom_mean = dd.Aggregation( 'custom_mean',lambda s: (s.count(),s.sum()),lambda count,sum: (count.sum(),sum.sum()),sum: sum / count,) df.groupby('g').agg(custom_mean) 我想到最好的方法来做到这一点.目前我有以下功能: def custom_count(x): count = Counter(x) freq_list = count.values() max_cnt = max(freq_list) total = freq_list.count(max_cnt) return count.most_common(total) custom_mode = dd.Aggregation( 'custom_mode',lambda s: custom_count(s),lambda s1: s1.extend(),lambda s2: ...... ) 但是,我不得不理解如何使用agg部分.任何有关这个问题的帮助将不胜感激. 谢谢! 解决方法
不可否认,这些文档目前在细节方面略显清晰.感谢您将此问题提请我注意.如果这个答案有所帮助,请现在就告诉我,我将为dask提供更新版本的文档.
对于您的问题:对于单个返回值,聚合的不同步骤等效于: res = chunk(df.groupby('g')['col']) res = agg(res.groupby(level=[0])) res = finalize(res) 在这些术语中,模式功能可以如下实现: def chunk(s): # for the comments,assume only a single grouping column,the # implementation can handle multiple group columns. # # s is a grouped series. value_counts creates a multi-series like # (group,value): count return s.value_counts() def agg(s): # s is a grouped multi-index series. In .apply the full sub-df will passed # multi-index and all. Group on the value level and sum the counts. The # result of the lambda function is a series. Therefore,the result of the # apply is a multi-index series like (group,value): count return s.apply(lambda s: s.groupby(level=-1).sum()) # faster version using pandas internals s = s._selected_obj return s.groupby(level=list(range(s.index.nlevels))).sum() def finalize(s): # s is a multi-index series of the form (group,value): count. First # manually group on the group part of the index. The lambda will receive a # sub-series with multi index. Next,drop the group part from the index. # Finally,determine the index with the maximum value,i.e.,the mode. level = list(range(s.index.nlevels - 1)) return ( s.groupby(level=level) .apply(lambda s: s.reset_index(level=level,drop=True).argmax()) ) mode = dd.Aggregation('mode',chunk,agg,finalize) 请注意,在绑定的情况下,此实现与数据帧.mode函数不匹配.如果出现平局,此版本将返回其中一个值,而不是所有值. 模式聚合现在可以用作 import pandas as pd import dask.dataframe as dd df = pd.DataFrame({ 'col': [0,2,3] * 10,'g0': [0,1] * 10,'g1': [0,}) ddf = dd.from_pandas(df,npartitions=10) res = ddf.groupby(['g0','g1']).agg({'col': mode}).compute() print(res) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |