模拟大数据处理、linux下hash_map()使用
写一个程序模拟大数据处理方法。 模拟情景:访问日志里有100万条ip,每条ip占一行。现在要求访问次数最高的前3个ip(后面称作top3)。(假设我们的内存不足以同时处理100万条ip)。 处理方法:逐条处理100万条ip中的每一条ip,根据hash值将ip写入到不同的小文件中(相同hashcode的ip写入同一个小文件中),对每个小文件用hash_map统计每一条ip出现的次数,然后针对每一个小文件求top3(小文件个数设置应使内存足以处理每一个小文件),将每个小文件对应的top3写入对应的统计文件(一个统计文件对应一个小文件)中,最后在所有统计文件中求整体的top3。 求top3时用的方法是:用前三个元素建一个三个元素的小根堆,然后依次遍历剩下的元素,当一个元素出现的次数大于小根堆堆顶元素出现次数时,用这个元素替换堆顶元素,然后对堆顶元素进行一次向下筛选。知道所有元素遍历完,此时的小根堆就是出现次数最多的top3。 ? 我所用的变成环境是:linux,gnu编译器,在此环境下使用hash_map时遇到一个问题:gnu的hash_map不能以std::string类型作为键类型,原因是gnu版编译器没有定义求std::string的hashcode的函数,解决方法是自己定义一个对std::string类型求hashcode的类。gnu版编译器提供的hash_map可以以char*作为键类型,但需要自己定义char*代表的字符串比较类模板(否则hash_map在进行关键字比较时,比较的是指针,而不是字符串),另外,以char*作为键类型时,hash_map是处理的char*变量指向的字符串(hash_map只记住了char*变量,并没有记住字符串),所以为了表示不同的字符串,必须分配空间大量字符串空间已存放不同的字符串(假如用同一个char[]存放每次读取的字符串,虽然每个字符串的hashcode不同,但因为改变了hash表中某一个元素的键的内容(这些键会在iterator++操作时用到:iterator++操作根据当前元素的键算出下一个应该遍历的元素在hash表中的位置,具体做法是:计算当前元素的键的hashcode,根据hashcode就知道这个元素在hash表中的第几个桶中(桶号),找下一个元素时,若当前同的元素已遍历完,那么就需要当前桶号的后面一个桶的号(当前桶号加1),所以若用同一个字符数组存储字符串,做++操作时用来计算hashcode的字符串的内容与插入那个字符串时的内容不同(这样两次计算的hashcode就不相同,也就是查找时计算出来的hashcode不再是当前元素所在的桶的桶号),这就会引起异常结果),使得在进行++操作时差生异常结果,可能导致死循环。),耗费空间较大。 ? 下面分别针对std::string类型作为键类型和char*作为键类型两种情况提供代码。 1、以char*作为键类型 #include<stdio.h> #include<stdlib.h> #include<string.h> #include<iostream> #include<boost/functional/hash.hpp> #include<string> #include<ext/hash_map> using namespace __gnu_cxx; void create_original_file() //生成一个初试大文件 { FILE* original_file_pointer=fopen("./original_file","w"); char str_ip[16]; int rand_number; char str_random[5]; srand(0); for(int i=0;i<1000000;i++) { strcpy(str_ip,"192.168."); rand_number=(int)(255*(rand()/(RAND_MAX+1.0))); sprintf(str_random,"%d",rand_number); strcat(str_ip,str_random); strcat(str_ip,"."); rand_number=(int)(255*(rand()/(RAND_MAX+1.0))); sprintf(str_random,str_random); if(i<999999) fprintf(original_file_pointer,"%sn",str_ip); else fprintf(original_file_pointer,"%s",str_ip); } fclose(original_file_pointer); } void divide_file() //把初试大文件分割1000个小文件 { boost::hash<std::string> ip_hash; std::size_t ip_hash_code; FILE* small_files[1000]; for(int i=0;i<1000;i++) { char th[30]="./smallfiles/small_file_"; sprintf(th+24,i); small_files[i]=fopen(th,"w"); } FILE* original_file_pointer=fopen("./original_file","r"); char str_ip[16]; std::string string_ip; for(int i=0;i<1000000;i++) { fscanf(original_file_pointer,str_ip); string_ip=str_ip; ip_hash_code=ip_hash(string_ip); fprintf(small_files[ip_hash_code%1000],str_ip); } fclose(original_file_pointer); for(int i=0;i<1000;i++) { fclose(small_files[i]); } } struct ip_counter_struct //ip以及其对应的counter的结构体 { std::string struct_string_ip; int struct_counter; }; void sift_down(ip_counter_struct struct_map[],int position,int length) //堆的一次向下筛选 { int j=2*position+1; while(j<length) { if(((j+1)<length)&&(struct_map[j+1].struct_counter<struct_map[j].struct_counter)) j=j+1; if(struct_map[position].struct_counter>struct_map[j].struct_counter) { std::swap(struct_map[position],struct_map[j]); position=j; j=2*position+1; } else break; } } void create_heap(ip_counter_struct struct_map[],int length) //建堆 { int i=(length-2)/2; while(i>0) { sift_down(struct_map,i,length); i=(i-1)/2; } sift_down(struct_map,length); } struct eqstr //hash_map用来比较char*表示的字符串时所用的类 { bool operator()(const char* s1,const char* s2) const { return strcmp(s1,s2)==0; } }; void statistic() //统计每个ip出现的次数,并把每个小文件对应的top3写入对应的统计文件 { ip_counter_struct ip_counter_s[3]; for(int i=0;i<3;i++) { ip_counter_s[i].struct_counter=0; } char str_ip[10000][16]; for(int i=0;i<1000;i++) { hash_map<const char*,int,hash<const char*>,eqstr> ip_counter; char th[30]="./smallfiles/small_file_"; char statistic_th[35]="./statistic/statistic_file_"; sprintf(th+24,i); sprintf(statistic_th+27,i); FILE* small_file_pointer=fopen(th,"r"); FILE* statistic_file_pointer=fopen(statistic_th,"w"); int j=0; for(;fscanf(small_file_pointer,str_ip[j])!=EOF;) { ip_counter[str_ip[j]]=ip_counter[str_ip[j]]+1; j++; } fclose(small_file_pointer); hash_map<const char*,eqstr>::iterator hash_map_it=ip_counter.begin(); int number; for(number=0;number<3;number++) { if(hash_map_it!=ip_counter.end()) { ip_counter_s[number].struct_string_ip=hash_map_it->first; ip_counter_s[number].struct_counter=hash_map_it->second; ++hash_map_it; } else break; } switch(number) { case 0: break; case 1: fprintf(statistic_file_pointer,"%s ",ip_counter_s[0].struct_string_ip.c_str()); fprintf(statistic_file_pointer,ip_counter_s[0].struct_counter); break; case 2: fprintf(statistic_file_pointer,"%dn",ip_counter_s[0].struct_counter); fprintf(statistic_file_pointer,ip_counter_s[1].struct_string_ip.c_str()); fprintf(statistic_file_pointer,ip_counter_s[1].struct_counter); break; case 3: create_heap(ip_counter_s,3); while(hash_map_it!=ip_counter.end()) { if(hash_map_it->second>ip_counter_s[0].struct_counter) { ip_counter_s[0].struct_string_ip=hash_map_it->first; ip_counter_s[0].struct_counter=hash_map_it->second; sift_down(ip_counter_s,3); } ++hash_map_it; } for(int i=0;i<3;i++) { fprintf(statistic_file_pointer,ip_counter_s[i].struct_string_ip.c_str()); fprintf(statistic_file_pointer,ip_counter_s[i].struct_counter); } break; defualt:break; } fclose(statistic_file_pointer); } } void get_total_top_k() //得到整体的top3 { ip_counter_struct ip_counter_s[3]; for(int i=0;i<3;i++) { ip_counter_s[i].struct_counter=0; } int number=0; char str_ip[16]=""; int counter=0; for(int i=0;i<1000;i++) { char statistic_th[35]="./statistic/statistic_file_"; sprintf(statistic_th+27,i); FILE* statistic_file_pointer=fopen(statistic_th,"r"); while(fscanf(statistic_file_pointer,str_ip)!=EOF) { fscanf(statistic_file_pointer,&counter); if(number<2) { ip_counter_s[number].struct_string_ip=str_ip; ip_counter_s[number].struct_counter=counter; number++; } else if(number==2) { ip_counter_s[number].struct_string_ip=str_ip; ip_counter_s[number].struct_counter=counter; number++; create_heap(ip_counter_s,3); } else { if(counter>ip_counter_s[0].struct_counter) { ip_counter_s[0].struct_string_ip=str_ip; ip_counter_s[0].struct_counter=counter; sift_down(ip_counter_s,3); } number++; } } fclose(statistic_file_pointer); } for(int i=0;i<3;i++) std::cout<<ip_counter_s[i].struct_string_ip<<" "<<ip_counter_s[i].struct_counter<<std::endl; } int main() { create_original_file(); divide_file(); statistic(); get_total_top_k(); return 0; }
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<iostream> #include<boost/functional/hash.hpp> #include<string> #include<ext/hash_map> using namespace __gnu_cxx; namespace __gnu_cxx { template<> struct hash<std::string> //定义计算std::string的hashcode的类 { std::size_t operator()(const std::string &s) const { boost::hash<std::string> string_hash; return string_hash(s); } }; } void create_original_file() { FILE* original_file_pointer=fopen("./original_file",str_ip); } fclose(original_file_pointer); } void divide_file() { boost::hash<std::string> ip_hash; std::size_t ip_hash_code; FILE* small_files[1000]; for(int i=0;i<1000;i++) { char th[30]="./smallfiles/small_file_"; sprintf(th+24,str_ip); } fclose(original_file_pointer); for(int i=0;i<1000;i++) { fclose(small_files[i]); } } struct ip_counter_struct { std::string struct_string_ip; int struct_counter; }; void sift_down(ip_counter_struct struct_map[],int length) { int j=2*position+1; while(j<length) { if(((j+1)<length)&&(struct_map[j+1].struct_counter<struct_map[j].struct_counter)) j=j+1; if(struct_map[position].struct_counter>struct_map[j].struct_counter) { std::swap(struct_map[position],int length) { int i=(length-2)/2; while(i>0) { sift_down(struct_map,length); } void statistic() { ip_counter_struct ip_counter_s[3]; for(int i=0;i<3;i++) { ip_counter_s[i].struct_counter=0; } char str_ip[16]=""; std::string string_ip; for(int i=0;i<1000;i++) { hash_map<std::string,int> ip_counter; char th[30]="./smallfiles/small_file_"; char statistic_th[35]="./statistic/statistic_file_"; sprintf(th+24,"w"); for(;fscanf(small_file_pointer,str_ip)!=EOF;) { string_ip=str_ip; ip_counter[string_ip]=ip_counter[string_ip]+1; } fclose(small_file_pointer); hash_map<std::string,int>::iterator hash_map_it=ip_counter.begin(); int number; for(number=0;number<3;number++) { if(hash_map_it!=ip_counter.end()) { ip_counter_s[number].struct_string_ip=hash_map_it->first; ip_counter_s[number].struct_counter=hash_map_it->second; ++hash_map_it; } else break; } switch(number) { case 0: break; case 1: fprintf(statistic_file_pointer,ip_counter_s[i].struct_counter); } break; defualt:break; } fclose(statistic_file_pointer); } } void get_total_top_k() { ip_counter_struct ip_counter_s[3]; for(int i=0;i<3;i++) { ip_counter_s[i].struct_counter=0; } int number=0; char str_ip[16]=""; int counter=0; for(int i=0;i<1000;i++) { char statistic_th[35]="./statistic/statistic_file_"; sprintf(statistic_th+27,3); } number++; } } fclose(statistic_file_pointer); } for(int i=0;i<3;i++) std::cout<<ip_counter_s[i].struct_string_ip<<" "<<ip_counter_s[i].struct_counter<<std::endl; } int main() { create_original_file(); divide_file(); statistic(); get_total_top_k(); return 0; } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |