如果追加失败,如何有效地重建pandas hdfstore表
发布时间:2020-12-20 13:10:30 所属栏目:Python 来源:网络整理
导读:我正在使用pandas中的hdfstore来处理正在进行的迭代过程中的数据帧.在每次迭代中,我追加到hdfstore中的表.这是一个玩具示例: import pandas as pdfrom pandas import HDFStoreimport numpy as npfrom random import choicefrom string import ascii_letters
我正在使用pandas中的hdfstore来处理正在进行的迭代过程中的数据帧.在每次迭代中,我追加到hdfstore中的表.这是一个玩具示例:
import pandas as pd from pandas import HDFStore import numpy as np from random import choice from string import ascii_letters alphanum=np.array(list(ascii_letters)+range(0,9)) def hdfstore_append(storefile,key,df,format="t",columns=None,data_columns=None): if df is None: return if key[0]!='/': key='/'+key with HDFStore(storefile) as store: if key not in store.keys(): store.put(key,format=format,columns=columns,data_columns=data_columns) else: try: store.append(key,df) except Exception as inst: df = pd.concat([store.get(key),df]) store.put(key,data_columns=data_columns) storefile="db.h5" for i in range(0,100): df=pd.DataFrame([dict(n=np.random.randn(),s=''.join(alphanum[np.random.randint(1,len(alphanum),np.random.randint(1,2*(i+1))]))],index=[i]) hdfstore_append(storefile,'/SO/df',columns=df.columns,data_columns=True) hdfstore_append函数可以防止hdfstore.append抛出的各种异常,并在必要时重建表.这种方法的问题是,当商店中的表变得非常大时,它变得非常慢. 有没有更有效的方法来做到这一点? 解决方法
下面是一个用于构建大型pandas hdfstores的有效方法的示例.关键是在表变大时缓存帧编号.而不是附加,删除预先存在的数据将基本上创建一个put.
from __future__ import (absolute_import,division,print_function,unicode_literals) import six import logging import os from abc import ABCMeta,abstractmethod,abstractproperty import warnings import pandas as pd logger = logging.getLogger(__name__) class FramewiseData(object): "Abstract base class defining a data container with framewise access." __metaclass__ = ABCMeta @abstractmethod def put(self,df): pass @abstractmethod def get(self,frame_no): pass @abstractproperty def frames(self): pass @abstractmethod def close(self): pass @abstractproperty def t_column(self): pass def __getitem__(self,frame_no): return self.get(frame_no) def __len__(self): return len(self.frames) def dump(self,N=None): """Return data from all,or the first N,frames in a single DataFrame Parameters ---------- N : integer optional; if None,return all frames Returns ------- DataFrame """ if N is None: return pd.concat(iter(self)) else: i = iter(self) return pd.concat((next(i) for _ in range(N))) @property def max_frame(self): return max(self.frames) def _validate(self,df): if self.t_column not in df.columns: raise ValueError("Cannot write frame without a column " "called {0}".format(self.t_column)) if df[self.t_column].nunique() != 1: raise ValueError("Found multiple values for 'frame'. " "Write one frame at a time.") def __iter__(self): return self._build_generator() def _build_generator(self): for frame_no in self.frames: yield self.get(frame_no) def __enter__(self): return self def __exit__(self,type,value,traceback): self.close() KEY_PREFIX = 'Frame_' len_key_prefix = len(KEY_PREFIX) def code_key(frame_no): "Turn the frame_no into a 'natural name' string idiomatic of HDFStore" key = '{0}{1}'.format(KEY_PREFIX,frame_no) return key def decode_key(key): frame_no = int(key[len_key_prefix:]) return frame_no class PandasHDFStore(FramewiseData): """An interface to an HDF5 file with framewise access,using pandas. Save each frame's data to a node in a pandas HDFStore. Any additional keyword arguments to the constructor are passed to pandas.HDFStore(). """ def __init__(self,filename,mode='a',t_column='frame',**kwargs): self.filename = os.path.abspath(filename) self._t_column = t_column self.store = pd.HDFStore(self.filename,mode,**kwargs) @property def t_column(self): return self._t_column @property def max_frame(self): return max(self.frames) def put(self,df): if len(df) == 0: warnings.warn('An empty DataFrame was passed to put(). Continuing.') return frame_no = df[self.t_column].values[0] # validated to be all the same key = code_key(frame_no) # Store data as tabular instead of fixed-format. # Make sure remove any prexisting data,so don't really 'append'. try: self.store.remove(key) except KeyError: pass self.store.put(key,format='table') def get(self,frame_no): key = code_key(frame_no) frame = self.store.get(key) return frame @property def frames(self): """Returns sorted list of integer frame numbers in file""" return self._get_frame_nos() def _get_frame_nos(self): """Returns sorted list of integer frame numbers in file""" # Pandas' store.keys() scans the entire file looking for stored Pandas # structures. This is very slow for large numbers of frames. # Instead,scan the root level of the file for nodes with names # matching our scheme; we know they are DataFrames. r = [decode_key(key) for key in self.store.root._v_children.keys() if key.startswith(KEY_PREFIX)] r.sort() return r def close(self): self.store.close() class PandasHDFStoreBig(PandasHDFStore): """Like PandasHDFStore,but keeps a cache of frame numbers. This can give a large performance boost when a file contains thousands of frames. If a file was made in PandasHDFStore,opening it with this class and then closing it will add a cache (if mode != 'r'). Any additional keyword arguments to the constructor are passed to pandas.HDFStore(). """ def __init__(self,**kwargs): self._CACHE_NAME = '_Frames_Cache' self._frames_cache = None self._cache_dirty = False # Whether _frames_cache needs to be written out super(PandasHDFStoreBig,self).__init__(filename,t_column,**kwargs) @property def frames(self): # Hit memory cache,then disk cache if self._frames_cache is not None: return self._frames_cache else: try: self._frames_cache = list(self.store[self._CACHE_NAME].index.values) self._cache_dirty = False except KeyError: self._frames_cache = self._get_frame_nos() self._cache_dirty = True # In memory,but not in file return self._frames_cache def put(self,df): self._invalidate_cache() super(PandasHDFStoreBig,self).put(df) def rebuild_cache(self): """Delete cache on disk and rebuild it.""" self._invalidate_cache() _ = self.frames # Compute cache self._flush_cache() def _invalidate_cache(self): self._frames_cache = None try: del self.store[self._CACHE_NAME] except KeyError: pass def _flush_cache(self): """Writes frame cache if dirty and file is writable.""" if (self._frames_cache is not None and self._cache_dirty and self.store.root._v_file._iswritable()): self.store[self._CACHE_NAME] = pd.DataFrame({'dummy': 1},index=self._frames_cache) self._cache_dirty = False def close(self): """Updates cache,writes if necessary,then closes file.""" if self.store.root._v_file._iswritable(): _ = self.frames # Compute cache self._flush_cache() super(PandasHDFStoreBig,self).close() class PandasHDFStoreSingleNode(FramewiseData): """An interface to an HDF5 file with framewise access,using pandas,that is faster for cross-frame queries. This implementation is more complex than PandasHDFStore,but it simplifies (speeds up?) cross-frame queries,like queries for a single probe's entire trajectory. Any additional keyword arguments to the constructor are passed to pandas.HDFStore(). """ def __init__(self,key='FrameData',use_tabular_copy=False,**kwargs): self.filename = os.path.abspath(filename) self.key = key self._t_column = t_column self.store = pd.HDFStore(self.filename,**kwargs) with pd.get_store(self.filename) as store: try: store[self.key] except KeyError: pass else: self._validate_node(use_tabular_copy) @property def t_column(self): return self._t_column def put(self,df): if len(df) == 0: warnings.warn('An empty DataFrame was passed to put(). Continuing.') return self._validate(df) self.store.append(self.key,data_columns=True) def get(self,frame_no): frame = self.store.select(self.key,'{0} == {1}'.format( self._t_column,frame_no)) return frame def dump(self,return all frames Returns ------- DataFrame """ if N is None: return self.store.select(self.key) else: Nth_frame = self.frames[N - 1] return self.store.select(self.key,'{0} <= {1}'.format( self._t_column,Nth_frame)) def close(self): self.store.close() def __del__(self): if hasattr(self,'store'): self.close() @property def frames(self): """Returns sorted list of integer frame numbers in file""" # I assume one column can fit in memory,which is not ideal. # Chunking does not seem to be implemented for select_column. frame_nos = self.store.select_column(self.key,self.t_column).unique() frame_nos.sort() return frame_nos def _validate_node(self,use_tabular_copy): # The HDFStore might be non-tabular,which means we cannot select a # subset,and this whole structure will not work. # For convenience,this can rewrite the table into a tabular node. if use_tabular_copy: self.key = _make_tabular_copy(self.filename,self.key) pandas_type = getattr(getattr(getattr( self.store._handle.root,self.key,None),'_v_attrs','pandas_type',None) if not pandas_type == 'frame_table': raise ValueError("This node is not tabular. Call with " "use_tabular_copy=True to proceed.") def _make_tabular_copy(store,key): """Copy the contents nontabular node in a pandas HDFStore into a tabular node""" tabular_key = key + '/tabular' logger.info("Making a tabular copy of %s at %s",(key,tabular_key)) store.append(tabular_key,store.get(key),data_columns=True) return tabular_key (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |