大数据处理之二
发布时间:2020-12-14 02:33:23 所属栏目:大数据 来源:网络整理
导读:有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词 1、利用每个词的哈希值,进行5000求余,分发到1~5000为名的文件当中,就平均而言每个文件大小会是200K左右 2、针对步骤1产生的结果,如果大于1
有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词
1、利用每个词的哈希值,进行5000求余,分发到1~5000为名的文件当中,就平均而言每个文件大小会是200K左右 2、针对步骤1产生的结果,如果大于1M的话,则根据文件大小,进行重新分配,当然是放在5001~5500当中 3、依次读取每个文件进行频度统计,将结果放在result_i的文件当中 4、依次读取result_i文件,此为其一,然后进行归并排序 Note: ????? 这里要注意一点,因为在分发操作完成之后检测文件大小,如果文件大于500K,则继续进行分发,尽量将每个文件的大小控制在200k左右,此为其一, ???? 其二读取每一个文件,同时将所有文件i.txt文件进行查询词的频度统计,并将结果有序排列在result_i.txt当中 // ConsoleApplication1.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include<Windows.h> #include<assert.h> #include<hash_map> #include<list> #include<iostream> #include<fstream> using namespace std; #define TOP_K 100 #define SRC_FILE "sample.txt" #define MAX_SIZE 300000 struct strequal{ bool operator()(char* a,char* b){ return strcmp(a,b) == 0; } static int bucket_size; size_t operator()(char*a) { int i = 0; int sum = 0; while (a[i] != ' ') { sum += a[i]; } return sum % 40; } }; int get_hashes(const char* src_con,int base_num) { int i = 0; int sum = 0; while (src_con[i] != ' ') { sum += src_con[i]; } return sum % base_num; } void save2Txt(const char* filename,const char* con) { ofstream out_txt; out_txt.open(filename,ios::app); out_txt.write(con,strlen(con)); out_txt.write("n",strlen("n")); out_txt.close(); } void dispatchFile(){ //根据数据内容进行内容分发 ifstream in_txt; in_txt.open(SRC_FILE); while (!in_txt.eof()) { char buf[17] = { 0 }; in_txt.getline(buf,17); int a = get_hashes(buf,5000); a++;//保证存在的文件是在1-5000当中 char file_name[20] = { 0 }; sprintf_s(file_name,"%d.txt",a); save2Txt(file_name,buf); } in_txt.close(); } void checkFile() { //该函数的作用是检测1-5000个函数当中,如果该函数大于200K,则进行重新分发 for (int i = 1; i <= 5000; i++) { char buf[20] = { 0 }; int file_size = 0; sprintf_s(buf,i); ifstream in_txt; in_txt.open(buf); //获得文件大小 in_txt.seekg(0,ios::end); file_size = in_txt.tellg(); //移动到文件的头部 in_txt.seekg(ios::beg); bool isDelete = false; if (file_size > MAX_SIZE) { //重新进行buf文件的分配,首先计算需要进行分配的文件个数 int file_count = file_size / MAX_SIZE; file_count++; while (!in_txt.eof()) { char con_line[17] = { 0 }; in_txt.getline(con_line,17); int a = get_hashes(con_line,file_count); char check_name[20] = { 0 }; sprintf_s(check_name,"%d_%d.txt",i,a); save2Txt(check_name,con_line); } isDelete = true; } in_txt.close(); if (isDelete) { ::DeleteFileA(buf); } } } void save_hashmap2txt(hash_map<char*,int>hash_obj,char* filename) { ofstream out_txt; hash_map<char*,int>::iterator itr; out_txt.open(filename); for (itr = hash_obj.begin(); itr != hash_obj.end(); itr++) { char id_str[5] = { 0 }; out_txt.write(itr->first,strlen(itr->first)); out_txt.write("n",strlen("n")); sprintf_s(id_str,"%d",itr->second); out_txt.write(id_str,strlen(id_str)); out_txt.write("n",strlen("n")); } out_txt.close(); } void find_redis_txt(char* file_name) { //策略是根据file_name,获得.txt之前的字符串,然后对查找的文件名称进行匹配,如果找到了之前字符串+"_"则表示该文件是重新分配过的 HANDLE hFile; // windows对文件的操作首先要得到一个文件句柄 WIN32_FIND_DATAA wfd; // WIN32_FIND_DATAA 是windows定义的查找文件的结构 char lp_path[MAX_PATH] = { 0 }; ::GetCurrentDirectoryA(MAX_PATH,lp_path); strcat_s(lp_path,"*.*"); hFile = FindFirstFileA(lp_path,&wfd); // FindFirstFile函数查找一个文件,sz_path是要查找的文件名,可以是全路径或相对路径,也可以写通配符,如“c:/*.*” if (hFile != INVALID_HANDLE_VALUE) { while (FindNextFileA(hFile,&wfd)) { // 利用第一次找到的文件句柄,继续寻找下个文件,如果找到下个文件,则函数填充wfd结构,并返回true if (strstr(wfd.cFileName,file_name)) { //读取文件进行hash_map ifstream in_txt; in_txt.open(wfd.cFileName); hash_map<char*,int> hash_obj; hash_map<char*,int>::iterator itr; while (!in_txt.eof()) { char mm[20] = { 0 }; in_txt.getline(mm,20); itr = hash_obj.find(mm); if (itr == hash_obj.end()) { pair<char*,int> tmp_pair; tmp_pair.second = 1; tmp_pair.first = mm; hash_obj.insert(tmp_pair); } else { itr->second++; } } in_txt.close(); //开始讲hash_obj的内容放在result_i的文件中 char out_file_name[50] = { 0 }; sprintf_s(out_file_name,"result_%s",wfd.cFileName); save_hashmap2txt(hash_obj,out_file_name); } } } } void check_first_result() { for (int i = 1; i <= 5000; i++) { char file_name[20] = { 0 }; sprintf_s(file_name,i); ifstream in_txt; in_txt.open(file_name,ios::_Nocreate); if (in_txt.is_open()) { //开始进行,内容的统计 hash_map<char*,int> obj_hash; hash_map<char*,int>::iterator itr; while (!in_txt.eof()) { char tmp[17] = { 0 }; in_txt.getline(tmp,17); itr = obj_hash.find(tmp); if (itr == obj_hash.end()) { pair<char*,int> mm; mm.first = tmp; mm.second = 1; obj_hash.insert(mm); } else { itr->second++; } } in_txt.close(); //将obj_hash的内容放在第一次结果当中 char tmp[20] = { 0 }; sprintf_s(tmp,"result_%d.txt",i); save_hashmap2txt(obj_hash,tmp); } else { in_txt.close(); //该内容是重新进行分发过的,需要寻找 char tmp[17] = { 0 }; sprintf_s(tmp,"%d_",i); find_redis_txt(tmp); } } } void InsertSort(list<char*>* str_lis,list<int>* int_lis) { } //进行堆的调整 void HeapAdjust(vector<char*>* str_lis,vector<int>* int_lis,int i,int length) { int nChild; for (; 2 * i + 1 < length; i = nChild) { int nChild_2 = 2 * i + 1; int nChild_1 = 2 * i; if (nChild_2 < length) { if (int_lis->at[nChild_1] < int_lis->at[nChild_2]) { nChild = nChild_2; } else { nChild = nChild_1; } if (int_lis->at[i] < int_lis->at[nChild]) { //插入一个,并且从最后进行删除最小的一个 int tmp = int_lis->at[i]; int_lis->at[i] = int_lis->at[nChild]; int_lis->at[nChild] = tmp; char* str_tmp = str_lis->at[i]; str_lis->at[i] = str_lis->at[nChild]; str_lis->at[nChild] = str_tmp; } else { break; } } } } void check_result_txt() { //存放结果的vector vector<char*>::iterator str_itr; vector<char*> str_lis; vector<int>::iterator int_itr; vector<int> int_lis; //策略是根据file_name,获得.txt之前的字符串,然后对查找的文件名称进行匹配,如果找到了之前字符串+"_"则表示该文件是重新分配过的 HANDLE hFile; // windows对文件的操作首先要得到一个文件句柄 WIN32_FIND_DATAA wfd; // WIN32_FIND_DATAA 是windows定义的查找文件的结构 char lp_path[MAX_PATH] = { 0 }; ::GetCurrentDirectoryA(MAX_PATH,lp_path); strcat_s(lp_path,"result")) { //对于该文件进行读取,一次读取两行,分别是字符串和频度,并存储在vector当中 //这里我们使用插入排序 ifstream in_txt; in_txt.open(wfd.cFileName); char buf[17] = { 0 }; char id_str[5] = { 0 }; while (!in_txt.eof()) { memset(buf,17); memset(id_str,5); in_txt.getline(buf,17); in_txt.getline(id_str,5); int a = atoi(id_str); if (str_lis.size() < TOP_K) { str_lis.push_back((char*)buf); int_lis.push_back(a); } else { //首先进行堆排序,使得list static int first = 1; if (first == 1) { //进行堆排序 int n = str_lis.size(); for (int i = n / 2 - 1; i >= 0; i--) { HeapAdjust(&str_lis,&int_lis,n); } for (int i = n - 1; i > 0; i--) { char* tmp = str_lis.at[0]; str_lis.at[0] = str_lis.at[i]; str_lis.at[i] = tmp; int int_tmp = int_lis.at[0]; int_lis.at[0] = int_lis.at[i]; int_lis.at[i] = int_tmp; HeapAdjust(&str_lis,i); } } else { //当list的个数超过了TOP_K个,我们找到频度最小的 int_itr = int_lis.begin(); str_itr = str_lis.begin(); for (int i = 0; i < TOP_K,int_itr != int_lis.end(),str_itr != str_lis.end();int_itr++,str_itr++,i++) { //开始查找第一个比a小的值 if (int_lis.at[i] < a) { int_lis.insert(int_itr,a); str_lis.insert(str_itr,buf); int_lis.pop_back(); str_lis.pop_back(); break; } } } first++; } } in_txt.close(); } } } } int main() { //将每一行数据根据哈希值进行文件的分发 dispatchFile(); //检测每个文件的大小如果大于MAX_SIZE则进行重新计算,重新分配,并且删除原有文件 checkFile(); //然后进行文件内容查询,开始进行文件结果查找 check_first_result(); //开始进行查找类似result为开始的txt文档,进行排序 check_result_txt(); return 0; } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |