0x08 大数据分析,七层基本功
摘要:欲练数据神功,必先挥刀……,嗯,先扎好马步吧!编写SQL语句,是数据统计分析最基本的能力了。觉得SQL的自定义功能太弱了,或者你觉得就算是Hive调用外部脚本也麻烦了,那么我们上当前最热的Spark 00 引言 2016就要来了,避不及,躲不开。新一年来之前,还是有一件值得高兴的事情,那便是年终奖了。 公司大了,什么样的人都有。嗯嗯……,说错了,是人大了,什么样的公司都进。嗯嗯……,还是不对。是公司大了,员工多了,要统计每个员工每年写的代码数量。以此来分配年终奖了。 假设有如下数据示例,第一列为员工的ID,第二列为年份,第三列为代码数。
欲练数据神功,必先挥刀……,嗯,先扎好马步吧。下面的7个步骤,是练好数据基本功的一些方法。 01 MySQL版本 用一个join来实现需求,取出数据: select a.* from table as a join table as b on a.id=b.id where a.count>b.count; 本例中是自己和自己join,最简单的inner join,将两个表中按id相同的行进行关联起来,最后按条件进行筛选需要的数据即可。需要注意,MySQL中的临时表,是没有办法自己join自己的。 02 Bash版本 Shell命令比通常想像的要强大,用好Shell命令,很多时候也可以方便的处理问题。 join -t',' -1 1 -2 1 data.csv data.csv | awk -F',' '$3>$5{print $1,$4,$5}' 这条命令看起来估计算最简洁了的,但还是用了两个命令。join也是一个非常神奇的命令,可以完成关系数据库的join功能,包括left join,right join,outer join,inner join。 03 Awk版本 #!/usr/local/bin/awk -f BEGIN{FS=","} { id = int($1) year = int($2) count = int($3) print id,year,count if(yc[id]["count"] < count){ yc[id]["year"] = year yc[id]["count"] = count } } END{ for(x in yc){ printf("%s,%s,%sn",x,yc[x]["year"],yc[x]["count"]) } } 上面一段简单的Awk代码,把Awk的一些基本概念都用上了。也算是“麻雀虽小,五脏俱全”了。涉及Awk的三段式代码结构,数组与赋值,条件判断与循环等编程基础概念。 因为awk是按行读入文件,因此我们的思想就是将当前的最大的值存储起来,再读入下一行,如果比当前最大值大,就更新,否则继续读入一行。 处理文件文件的方式,自然与数据库的join思想不一样,但你需要习惯这种方式,因为这种处理文件文件的方式,也是很多NoSQL的处理方式。 04 Python版本 import sys last_id = None for line in sys.stdin: idx,count = line.strip().split(',') if idx == last_id: if count > most_count: most_year,most_count = year,count else: if last_id: print '%s,%s' %(last_id,most_year,most_count) last_id = idx most_count = 0 if idx == last_id: print '%s,most_count) 处理的方式还是一样,按行读取文件并存储和记录,但逻辑实现起来感觉稍微有点绕而已。没有用数组或字典之类的来存储数据。 还需要注意,这个程序是需要对文件进行按id排序的,因为代码处理的是连续的行,并且假定相同的id是在连续的行上。 当然,你肯定会说,这个代码写得有些杂乱,不符合通常的思路。之所以写成这样,是因为我们后面在分布式环境中还要用。 05 Hive版本 假设Facebook有20亿用户,统计每个用户在每天中,各自发的消息的最多的那天和发送的条数,假设所有用户,每天都发消息。20亿用户,按Facebook上线10年算,3600天,共72000亿条记录,够大了吧!分别找出每个用户发消息最多的那天和发消息的次数。 且来看看,由Facebook开源出来的Hive数据仓库,如何处理! -- 见MySQL版本 你没有看错,我也没有骗你,还真是和MySQL用同样的代码。当然,Hive有自己的优化之类的,暂时先不管。 这个地方,有个前提,你只需要把那72000亿条数据,存放到HDFS文件系统上,然后建立一个外部表和HDFS文件进行关联,然后输入和MySQL同样的语句,Hive引擎会自然将SQL语句转换为下层的map-reduce代码运行。 重要的是,你的Hadoop集群有多强大,这个Hive语句就能达到多强大。还不用自己写map-reduce程序,就是分析师最熟悉的SQL语句。 如果你觉得Hive也是SQL语句,有些自定义的函数或者方法比较麻烦,那么Hive还可以调用外部的脚本,只要是可执行脚本都行:python、ruby、bash、scala、java、lisp随便你爱好。 06 Spark版本 from pyspark import SparkContext sc = SparkContext() data = sc.textFile('data.csv') data = data.map(lambda x: x.split(',')).map(lambda x: (x[0],(x[1],int(x[2])))).groupByKey().mapValues(lambda value: sorted(value,lambda x,y: cmp(x[1],y[1]),reverse=True)[0]) for item in data.collect(): print '%s,%s' % (item[0],item[1][0],item[1][1]) Spark支持几种编程接口,Scala、Java、Python,最近也开始支持R了。 上面虽然连续用了好几个map,但原理却非常简单,和python的map功能类似。唯一用了一个groupByKey功能,将相同的id聚合在一起,剩下的属性放在一个列表里面。对这个列表进行排序,取count最多的次数和年份,最后输出。 逻辑够简单,代码也够简洁。Spark强大的便利利益于Scala强大的数据结构与数据处理能力。 07 map-reduce版本 # mapper.py见python版本 # reducer.py见python版本 又一个大骗子! 通过Hadoop的Streaming接口来进行调用,只需要自定义mapper和reducer程序即可。上面的mapper和reducer可以直接用纯粹Python的单机版本。 输入是一些id,count行,输出还是同样的数据结构。只是在程序中,把当前这个程序的输入中,每个id最多的count和year找出来了。数据量已经减少了,每个id只会保留最大的一条数据。 分布式最基本的原理就是数据分块,在map阶段,对每个块的数据调用mapper程序,求出当前块里面每个id的最大count和year找出来。把这些输出作为reducer的输入,再求一次最大值,那么找出的便是全局的最大值。 08 结尾 不要兴奋,也许会突然冒出来一个小姑娘,告诉你说:切,上面的功能我用Excel也可以完美的实现。 当然可以了,聪明如你,Excel还能实现比这强大得多的功能。 理论上来说,上面所有工具都能完成任何统计分析需求。只是不同的地方,实现的方式各有不同,有的复杂,有的简单,有的快,有的慢而已。选择你觉得最简单的方式,搞定任务即可。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |