加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

大数据处理之二

发布时间:2020-12-14 02:33:23 所属栏目:大数据 来源:网络整理
导读:有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词 1、利用每个词的哈希值,进行5000求余,分发到1~5000为名的文件当中,就平均而言每个文件大小会是200K左右 2、针对步骤1产生的结果,如果大于1
有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词

1、利用每个词的哈希值,进行5000求余,分发到1~5000为名的文件当中,就平均而言每个文件大小会是200K左右

2、针对步骤1产生的结果,如果大于1M的话,则根据文件大小,进行重新分配,当然是放在5001~5500当中

3、依次读取每个文件进行频度统计,将结果放在result_i的文件当中

4、依次读取result_i文件,此为其一,然后进行归并排序

Note:

????? 这里要注意一点,因为在分发操作完成之后检测文件大小,如果文件大于500K,则继续进行分发,尽量将每个文件的大小控制在200k左右,此为其一,

???? 其二读取每一个文件,同时将所有文件i.txt文件进行查询词的频度统计,并将结果有序排列在result_i.txt当中

// ConsoleApplication1.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include<Windows.h>
#include<assert.h>
#include<hash_map>
#include<list>
#include<iostream>
#include<fstream>

using namespace std;
#define TOP_K  100
#define SRC_FILE "sample.txt"
#define MAX_SIZE  300000
struct strequal{
	bool operator()(char* a,char* b){
		return strcmp(a,b) == 0;
	}
	static	int bucket_size;
	size_t operator()(char*a)
	{
		int i = 0;
		int sum = 0;
		while (a[i] != '')
		{
			sum += a[i];
		}
		return sum % 40;
	}
};
int get_hashes(const char* src_con,int base_num)
{
	int i = 0;
	int sum = 0;
	while (src_con[i] != '')
	{
		sum += src_con[i];
	}
	return sum % base_num;
}
void save2Txt(const char* filename,const char* con)
{
	ofstream out_txt;
	out_txt.open(filename,ios::app);
	out_txt.write(con,strlen(con));
	out_txt.write("n",strlen("n"));
	out_txt.close();
}
void dispatchFile(){
	//根据数据内容进行内容分发	
	ifstream in_txt;
	in_txt.open(SRC_FILE);
	while (!in_txt.eof())
	{
		char buf[17] = { 0 };
		in_txt.getline(buf,17);
		int a = get_hashes(buf,5000);
		a++;//保证存在的文件是在1-5000当中
		char file_name[20] = { 0 };
		sprintf_s(file_name,"%d.txt",a);
		save2Txt(file_name,buf);
	}
	in_txt.close();
}
void checkFile()
{
	//该函数的作用是检测1-5000个函数当中,如果该函数大于200K,则进行重新分发
	for (int i = 1; i <= 5000; i++)
	{
		char buf[20] = { 0 };
		int  file_size = 0;
		sprintf_s(buf,i);
		ifstream in_txt;
		in_txt.open(buf);
		//获得文件大小
		in_txt.seekg(0,ios::end);
		file_size = in_txt.tellg();
		//移动到文件的头部
		in_txt.seekg(ios::beg);
		bool isDelete = false;
		if (file_size > MAX_SIZE)
		{
			//重新进行buf文件的分配,首先计算需要进行分配的文件个数
			int file_count = file_size / MAX_SIZE;
			file_count++;
			while (!in_txt.eof())
			{
				char con_line[17] = { 0 };
				in_txt.getline(con_line,17);
				int a = get_hashes(con_line,file_count);
				char check_name[20] = { 0 };
				sprintf_s(check_name,"%d_%d.txt",i,a);
				save2Txt(check_name,con_line);
			}
			isDelete = true;
		}
		in_txt.close();
		if (isDelete)
		{
			::DeleteFileA(buf);
		}
	}
}
void save_hashmap2txt(hash_map<char*,int>hash_obj,char* filename)
{
	ofstream out_txt;
	hash_map<char*,int>::iterator itr;
	out_txt.open(filename);
	for (itr = hash_obj.begin(); itr != hash_obj.end(); itr++)
	{
		char id_str[5] = { 0 };
		out_txt.write(itr->first,strlen(itr->first));
		out_txt.write("n",strlen("n"));
		sprintf_s(id_str,"%d",itr->second);
		out_txt.write(id_str,strlen(id_str));
		out_txt.write("n",strlen("n"));
	}
	out_txt.close();
}
void find_redis_txt(char* file_name)
{
	//策略是根据file_name,获得.txt之前的字符串,然后对查找的文件名称进行匹配,如果找到了之前字符串+"_"则表示该文件是重新分配过的
	HANDLE hFile;  // windows对文件的操作首先要得到一个文件句柄

	WIN32_FIND_DATAA wfd;  // WIN32_FIND_DATAA 是windows定义的查找文件的结构


	char lp_path[MAX_PATH] = { 0 };
	::GetCurrentDirectoryA(MAX_PATH,lp_path);

	strcat_s(lp_path,"*.*");
	hFile = FindFirstFileA(lp_path,&wfd);  // FindFirstFile函数查找一个文件,sz_path是要查找的文件名,可以是全路径或相对路径,也可以写通配符,如“c:/*.*”
	if (hFile != INVALID_HANDLE_VALUE) {
		while (FindNextFileA(hFile,&wfd))
		{
			// 利用第一次找到的文件句柄,继续寻找下个文件,如果找到下个文件,则函数填充wfd结构,并返回true
			if (strstr(wfd.cFileName,file_name))
			{
				//读取文件进行hash_map
				ifstream in_txt;
				in_txt.open(wfd.cFileName);
				hash_map<char*,int> hash_obj;
				hash_map<char*,int>::iterator itr;
				while (!in_txt.eof())
				{
					char mm[20] = { 0 };
					in_txt.getline(mm,20);
					itr = hash_obj.find(mm);
					if (itr == hash_obj.end())
					{
						pair<char*,int> tmp_pair;
						tmp_pair.second = 1;
						tmp_pair.first = mm;
						hash_obj.insert(tmp_pair);
					}
					else
					{
						itr->second++;
					}
				}
				in_txt.close();
				//开始讲hash_obj的内容放在result_i的文件中
				char out_file_name[50] = { 0 };
				sprintf_s(out_file_name,"result_%s",wfd.cFileName);
				save_hashmap2txt(hash_obj,out_file_name);
			}
		}
	}
}
void check_first_result()
{
	for (int i = 1; i <= 5000; i++)
	{
		char file_name[20] = { 0 };
		sprintf_s(file_name,i);
		ifstream in_txt;
		in_txt.open(file_name,ios::_Nocreate);
		if (in_txt.is_open())
		{
			//开始进行,内容的统计
			hash_map<char*,int> obj_hash;
			hash_map<char*,int>::iterator itr;
			while (!in_txt.eof())
			{
				char tmp[17] = { 0 };
				in_txt.getline(tmp,17);
				itr = obj_hash.find(tmp);
				if (itr == obj_hash.end())
				{
					pair<char*,int> mm;
					mm.first = tmp;
					mm.second = 1;
					obj_hash.insert(mm);
				}
				else
				{
					itr->second++;
				}
			}
			in_txt.close();
			//将obj_hash的内容放在第一次结果当中
			char tmp[20] = { 0 };
			sprintf_s(tmp,"result_%d.txt",i);
			save_hashmap2txt(obj_hash,tmp);
		}
		else
		{
			in_txt.close();
			//该内容是重新进行分发过的,需要寻找		
			char tmp[17] = { 0 };
			sprintf_s(tmp,"%d_",i);
			find_redis_txt(tmp);
		}

	}
}
void InsertSort(list<char*>* str_lis,list<int>* int_lis)
{

}
//进行堆的调整
void HeapAdjust(vector<char*>* str_lis,vector<int>* int_lis,int i,int length)
{
	int nChild;
	for (; 2 * i + 1 < length; i = nChild)
	{
		int nChild_2 = 2 * i + 1;
		int nChild_1 = 2 * i; 
		if (nChild_2 < length)
		{

			if (int_lis->at[nChild_1] < int_lis->at[nChild_2])
			{
				nChild = nChild_2;
			}
			else
			{
				nChild = nChild_1;
			}
			if (int_lis->at[i] < int_lis->at[nChild])
			{
				//插入一个,并且从最后进行删除最小的一个
				int tmp = int_lis->at[i];
				int_lis->at[i] = int_lis->at[nChild];
				int_lis->at[nChild] = tmp;
				char* str_tmp = str_lis->at[i];
				str_lis->at[i] = str_lis->at[nChild];
				str_lis->at[nChild] = str_tmp;
			}
			else
			{
				break;
			}
		}
	}
}
void check_result_txt()
{
	//存放结果的vector
	vector<char*>::iterator str_itr;
	vector<char*> str_lis;
	vector<int>::iterator int_itr;
	vector<int> int_lis;
	//策略是根据file_name,获得.txt之前的字符串,然后对查找的文件名称进行匹配,如果找到了之前字符串+"_"则表示该文件是重新分配过的
	HANDLE hFile;  // windows对文件的操作首先要得到一个文件句柄	
	WIN32_FIND_DATAA wfd;  // WIN32_FIND_DATAA 是windows定义的查找文件的结构
	char lp_path[MAX_PATH] = { 0 };
	::GetCurrentDirectoryA(MAX_PATH,lp_path);
	strcat_s(lp_path,"result"))
			{
				//对于该文件进行读取,一次读取两行,分别是字符串和频度,并存储在vector当中
				//这里我们使用插入排序
				ifstream in_txt;
				in_txt.open(wfd.cFileName);
				char buf[17] = { 0 };
				char id_str[5] = { 0 };
				while (!in_txt.eof())
				{
					memset(buf,17);
					memset(id_str,5);
					in_txt.getline(buf,17);
					in_txt.getline(id_str,5);
					int a = atoi(id_str);
					if (str_lis.size() < TOP_K)
					{
						str_lis.push_back((char*)buf);
						int_lis.push_back(a);
					}
					else
					{
						//首先进行堆排序,使得list
						static int first = 1;
						if (first == 1)
						{
							//进行堆排序
							int n = str_lis.size();
							for (int i = n / 2 - 1; i >= 0; i--)
							{
								HeapAdjust(&str_lis,&int_lis,n);
							}
							for (int i = n - 1; i > 0; i--)
							{
								char* tmp = str_lis.at[0];
								str_lis.at[0] = str_lis.at[i];
								str_lis.at[i] = tmp;
								int int_tmp = int_lis.at[0];
								int_lis.at[0] = int_lis.at[i];
								int_lis.at[i] = int_tmp;
								HeapAdjust(&str_lis,i);
							}
						}
						else
						{
							//当list的个数超过了TOP_K个,我们找到频度最小的
							int_itr = int_lis.begin();
							str_itr = str_lis.begin();
							for (int i = 0; i < TOP_K,int_itr != int_lis.end(),str_itr != str_lis.end();int_itr++,str_itr++,i++)
							{
								//开始查找第一个比a小的值
								if (int_lis.at[i] < a)
								{
									int_lis.insert(int_itr,a);
									str_lis.insert(str_itr,buf);
									int_lis.pop_back();
									str_lis.pop_back();
									break;
								}
							}
						}
						first++;

					}
				}
				in_txt.close();
			}
		}
	}
}
int main()
{
	//将每一行数据根据哈希值进行文件的分发
	dispatchFile();
	//检测每个文件的大小如果大于MAX_SIZE则进行重新计算,重新分配,并且删除原有文件
	checkFile();
	//然后进行文件内容查询,开始进行文件结果查找
	check_first_result();
	//开始进行查找类似result为开始的txt文档,进行排序	
	check_result_txt();
	return 0;
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读