利用pandas进行大文件计数处理的方法
Pandas读取大文件 要处理的是由探测器读出的脉冲信号,一组数据为两列,一列为时间,一列为脉冲能量,数据量在千万级,为了有一个直接的认识,先使用Pandas读取一些 import pandas as pd data = pd.read_table('filename.txt',iterator=True) chunk = data.get_chunk(5) 而输出是这样的: Out[4]: 332.977889999979 -0.0164794921875 0 332.97790 -0.022278 1 332.97791 -0.026855 2 332.97792 -0.030518 3 332.97793 -0.045776 4 332.97794 -0.032654 DataFram基本用法 这里,data只是个容器,pandas.io.parsers.TextFileReader。 使用astype可以实现dataframe字段类型转换 输出数据中,每组数据会多处一行,因为get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在读取过程中并没有指定DataFrame的columns,因此在get_chunk过程中,默认将第一组数据作为columns。因此需要在读取过程中指定names即DataFrame的columns。 import pandas as pd data = pd.read_table('filename.txt',iterator=True, names=['time','energe']) chunk = data.get_chunk(5) data['energe'] = df['energe'].astype('int') 输出为 Out[6]:
DataFram存储和索引 这里讲一下DataFrame这个格式,与一般二维数据不同(二维列表等),DataFrame既有行索引又有列索引,因此在建立一个DataFrame数据是 DataFrame(data,columns=[‘year',‘month',‘day'],index=[‘one',‘two',‘three'])
而pd.read_table中的names就是指定DataFrame的columns,而index自动设置。 而DataFrame的索引格式有很多
exp: In[1]:data[:2] Out[2]:
In[2]:data[data[‘month']>5] Out[2]:
如果我们直接把data拿来比较的话,相当于data中所有的标量元素 In[3]:data[data<6]=0 Out[3]:
Pandas运算 series = data.ix[0] data - series Out:
DataFrame与Series之间运算会将Series索引匹配到DataFrame的列,然后沿行一直向下广播 如果令series1 = data[‘year'] data.sub(series1,axis=0) 则每一列都减去该series1,axis为希望匹配的轴,=0行索引,即匹配列,=1列索引,则按行匹配。 DataFrame的一些函数方法 这个就有很多了,比如排序和排名;求和、平均数以及方差、协方差等数学方法;还有就是唯一值(类似于集合)、值计数和成员资格等方法。 当然还有一些更高级的属性,用的时候再看吧 数据处理 在得到数据样式后我们先一次性读取数据 start = time.time() data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv',names=['time','energe']) end = time.time() data.index print("The time is %f s" % (end - start)) plus = data['energe'] plus[plus < 0] = 0 The time is 29.403917 s RangeIndex(start=0,stop=68319232,step=1) 对于一个2G大小,千万级的数据,这个读取速度还是挺快的。之前使用matlab load用时160多s,但是不知道这个是否把数据完全读取了。然后只抽取脉冲信号,将负值归0,因为会出现一定的电子噪声从而产生一定负值。 然后就需要定位脉冲信号中的能峰了,也就是findpeaks 这里用到了scipy.signal中的find_peaks_cwt,具体用法可以参见官方文档 peaks = signal.find_peaks_cwt(pluse,np.arange(1,10)),它返回找到的peaks的位置,输入第一个为数据,第二个为窗函数,也就是在这个宽度的能窗内寻找峰,我是这样理解的。刚开始以为是数据的另一维坐标,结果找了半天没结果。不过事实上这个找的确定也挺慢的。 50w条的数据,找了足足7分钟,我这一个数据3000w条不得找半个多小时,而各种数据有好几十,恩。。这样是不行的,于是想到了并行的方法。这个下篇文章会讲到,也就是把数据按照chunksize读取,然后同时交给(map)几个进程同时寻峰,寻完后返回(reduce)一起计数,计数的同时,子进程再此寻峰。 在处理的时候碰到我自己的破 笔记本由于内存原因不能load这个数据,并且想着每次copy这么大数据好麻烦,就把一个整体数据文件分割成了几个部分,先对方法进行一定的实验,时间快,比较方便。 import pandas as pd def split_file(filename,size): name = filename.split('.')[0] data = pd.read_table(filename,chunksize=size,'intension']) i = 1 for piece in data: outname = name + str(i) + '.csv' piece.to_csv(outname,index=False,names = ['time','intension']) i += 1 def split_csvfile(filename,size): name = filename.split('.')[0] data = pd.read_csv(filename,'intension']) i = 1 for piece in data: outname = name + str(i) + '.csv' piece = piece['intension'] piece.to_csv(outname,index=False) i += 1 额..使用并行寻峰通过map/reduce的思想来解决提升效率这个想法,很早就实现了,但是,由于效果不是特别理想,所以放那也就忘了,今天整理代码来看了下当时记的些笔记,然后竟然发现有个评论…..我唯一收到的评论竟然是“催稿”=。=。想一想还是把下面的工作记录下来,免得自己后来完全忘记了。 rom scipy import signal import os import time import pandas as pd import numpy as np from multiprocessing import Pool import matplotlib.pylab as plt from functools import partial def findpeak(pluse): pluse[pluse < 0.05] = 0 print('Sub process %s.' % os.getpid()) start = time.time() peaks = signal.find_peaks_cwt(pluse,10)) # 返回一个列表 end = time.time() print("The time is %f s" % (end - start)) pks = [pluse[x] for x in peaks] return pks def histcnt(pks,edge=None,channel=None): cnt = plt.hist(pks,edge) res = pd.DataFrame(cnt[0],index=channel,columns=['cnt']) return res if __name__ == '__main__': with Pool(processes=8) as p: start = time.time() print('Parent process %s.' % os.getpid()) pluse = pd.read_csv('data/samples.csv',chunksize=50000,'energe']) channel = pd.read_csv('data/channels.txt',names=['value']) edges = channel * 2 edges = pd.DataFrame({'value': [0]}).append(edges,ignore_index=True) specal = [] for data in pluse: total = p.apply_async(findpeak,(data['energe'],),callback=partial(histcnt,edge=edges['value'],channel=channel['value'])) specal.append(total) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') spec = sum(specal) plt.figure() plt.plot(spec['cnt']) spec.to_csv('data/spec1.csv',header=False) print('every is OK') end = time.time() print("The time is %f s" % (end - start)) 由于对对进程线程的编程不是很了解,其中走了很多弯路,尝试了很多方法也,这个是最终效果相对较好的。 首先,通过 pd.readtable以chunksize=50000分块读取,edges为hist过程中的下统计box。 然后,apply_async为非阻塞调用findpeak,然后将结果返回给回调函数histcnt,但是由于回调函数除了进程返回结果还有额外的参数,因此使用partial,对特定的参数赋予固定的值(edge和channel)并返回了一个全新的可调用对象,这个新的可调用对象仍然需要通过制定那些未被赋值的参数(findpeak返回的值)来调用。这个新的课调用对象将传递给partial()的固定参数结合起来,同一将所有参数传递给原始函数(histcnt)。(至于为啥不在histcnt中确定那两个参数,主要是为了避免一直打开文件。。当然,有更好的办法只是懒得思考=。=),还有个原因就是,apply_async返回的是一个对象,需要通过该对象的get方法才能获取值。。 对于 apply_async官方上是这样解释的 Apply_async((func[,args[,kwds[,callback[,error_callback]]]])),apply()方法的一个变体,返回一个结果对象 如果指定回调,那么它应该是一个可调用的接受一个参数。结果准备好回调时,除非调用失败,在这种情况下,应用error_callback代替。 如果error_callback被指定,那么它应该是一个可调用的接受一个参数。如果目标函数失败,那么error_callback叫做除了实例。 回调应立即完成以来,否则线程处理结果将被封锁。 不使用回调函数的版本如下,即先将所有子进程得到的数据都存入peaks列表中,然后所有进程完毕后在进行统计计数。 import pandas as pd import time import scipy.signal as signal import numpy as np from multiprocessing import Pool import os import matplotlib.pyplot as plt def findpeak(pluse): pluse[pluse < 0] = 0 pluse[pluse > 100] = 0 print('Sub process %s.' % os.getpid()) start = time.time() peaks = signal.find_peaks_cwt(pluse,10)) end = time.time() print("The time is %f s" % (end - start)) res = [pluse[x] for x in peaks] return res if __name__ == '__main__': with Pool(processes=8) as p: start = time.time() print('Parent process %s.' % os.getpid()) pluse = pd.read_csv('data/sample.csv',chunksize=200000,'energe']) pks = [] for data in pluse: pks.append(p.apply_async(findpeak,))) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.') peaks = [] for i,ele in enumerate(pks): peaks.extend(ele.get()) peaks = pd.DataFrame(peaks,columns=['energe']) peaks.to_csv('peaks.csv',header=False,chunksize=50000) channel = pd.read_csv('data/channels.txt',names=['value']) channel *= 2 channel = pd.DataFrame({'value': [0]}).append(channel,ignore_index=True) plt.figure() spec = plt.hist(peaks['energe'],channel['value']) # out.plot.hist(bins=1024) # print(out) # cnt = peaks.value_counts(bins=1024) # cnt.to_csv('data/cnt.csv',header=False) print('every is OK') end = time.time() print("The time is %f s" % (end - start)) 以上这篇利用pandas进行大文件计数处理的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持编程小技巧。 您可能感兴趣的文章:
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- python在指定目录下查找gif文件的方法
- python – Pandas SQL相当于update by group by
- Python习题纠错1
- python – 按字母顺序排序2D列表?
- python – Numpy.dot()维度未对齐
- 使用python加密自己的密码
- python json模块中dumps、dump、loads、load函数介绍
- python – 在Robot Framework中包含子目录的Zip目录
- python – Django:在urlpatterns中指定Generic View时访问
- python import cv2异常(dll load fail / windows server 20