加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

如果追加失败,如何有效地重建pandas hdfstore表

发布时间:2020-12-20 13:10:30 所属栏目:Python 来源:网络整理
导读:我正在使用pandas中的hdfstore来处理正在进行的迭代过程中的数据帧.在每次迭代中,我追加到hdfstore中的表.这是一个玩具示例: import pandas as pdfrom pandas import HDFStoreimport numpy as npfrom random import choicefrom string import ascii_letters
我正在使用pandas中的hdfstore来处理正在进行的迭代过程中的数据帧.在每次迭代中,我追加到hdfstore中的表.这是一个玩具示例:

import pandas as pd
from pandas import HDFStore
import numpy as np
from random import choice
from string import ascii_letters
alphanum=np.array(list(ascii_letters)+range(0,9))
def hdfstore_append(storefile,key,df,format="t",columns=None,data_columns=None):
    if df is None:
        return
    if key[0]!='/':
        key='/'+key
    with HDFStore(storefile) as store:
        if key not in store.keys():
            store.put(key,format=format,columns=columns,data_columns=data_columns)
        else:
            try:
                store.append(key,df)
            except Exception as inst:
                df = pd.concat([store.get(key),df])
                store.put(key,data_columns=data_columns)

storefile="db.h5"
for i in range(0,100):
    df=pd.DataFrame([dict(n=np.random.randn(),s=''.join(alphanum[np.random.randint(1,len(alphanum),np.random.randint(1,2*(i+1))]))],index=[i])
    hdfstore_append(storefile,'/SO/df',columns=df.columns,data_columns=True)

hdfstore_append函数可以防止hdfstore.append抛出的各种异常,并在必要时重建表.这种方法的问题是,当商店中的表变得非常大时,它变得非常慢.

有没有更有效的方法来做到这一点?

解决方法

下面是一个用于构建大型pandas hdfstores的有效方法的示例.关键是在表变大时缓存帧编号.而不是附加,删除预先存在的数据将基本上创建一个put.

from __future__ import (absolute_import,division,print_function,unicode_literals)
import six
import logging
import os
from abc import ABCMeta,abstractmethod,abstractproperty
import warnings

import pandas as pd

logger = logging.getLogger(__name__)


class FramewiseData(object):
    "Abstract base class defining a data container with framewise access."

    __metaclass__ = ABCMeta

    @abstractmethod
    def put(self,df):
        pass

    @abstractmethod
    def get(self,frame_no):
        pass

    @abstractproperty
    def frames(self):
        pass

    @abstractmethod
    def close(self):
        pass

    @abstractproperty
    def t_column(self):
        pass

    def __getitem__(self,frame_no):
        return self.get(frame_no)

    def __len__(self):
        return len(self.frames)

    def dump(self,N=None):
        """Return data from all,or the first N,frames in a single DataFrame
        Parameters
        ----------
        N : integer
            optional; if None,return all frames
        Returns
        -------
        DataFrame
        """
        if N is None:
            return pd.concat(iter(self))
        else:
            i = iter(self)
            return pd.concat((next(i) for _ in range(N)))

    @property
    def max_frame(self):
        return max(self.frames)

    def _validate(self,df):
        if self.t_column not in df.columns:
            raise ValueError("Cannot write frame without a column "
                             "called {0}".format(self.t_column))
        if df[self.t_column].nunique() != 1:
            raise ValueError("Found multiple values for 'frame'. "
                             "Write one frame at a time.")

    def __iter__(self):
        return self._build_generator()

    def _build_generator(self):
        for frame_no in self.frames:
            yield self.get(frame_no)

    def __enter__(self):
        return self

    def __exit__(self,type,value,traceback):
        self.close()

KEY_PREFIX = 'Frame_'
len_key_prefix = len(KEY_PREFIX)


def code_key(frame_no):
    "Turn the frame_no into a 'natural name' string idiomatic of HDFStore"
    key = '{0}{1}'.format(KEY_PREFIX,frame_no)
    return key


def decode_key(key):
    frame_no = int(key[len_key_prefix:])
    return frame_no


class PandasHDFStore(FramewiseData):
    """An interface to an HDF5 file with framewise access,using pandas.
    Save each frame's data to a node in a pandas HDFStore.
    Any additional keyword arguments to the constructor are passed to
    pandas.HDFStore().
    """

    def __init__(self,filename,mode='a',t_column='frame',**kwargs):
        self.filename = os.path.abspath(filename)
        self._t_column = t_column
        self.store = pd.HDFStore(self.filename,mode,**kwargs)

    @property
    def t_column(self):
        return self._t_column

    @property
    def max_frame(self):
        return max(self.frames)

    def put(self,df):
        if len(df) == 0:
            warnings.warn('An empty DataFrame was passed to put(). Continuing.')
            return
        frame_no = df[self.t_column].values[0]  # validated to be all the same
        key = code_key(frame_no)
        # Store data as tabular instead of fixed-format.
        # Make sure remove any prexisting data,so don't really 'append'.
        try:
            self.store.remove(key)
        except KeyError:
            pass
        self.store.put(key,format='table')

    def get(self,frame_no):
        key = code_key(frame_no)
        frame = self.store.get(key)
        return frame

    @property
    def frames(self):
        """Returns sorted list of integer frame numbers in file"""
        return self._get_frame_nos()

    def _get_frame_nos(self):
        """Returns sorted list of integer frame numbers in file"""
        # Pandas' store.keys() scans the entire file looking for stored Pandas
        # structures. This is very slow for large numbers of frames.
        # Instead,scan the root level of the file for nodes with names
        # matching our scheme; we know they are DataFrames.
        r = [decode_key(key) for key in self.store.root._v_children.keys() if
             key.startswith(KEY_PREFIX)]
        r.sort()
        return r

    def close(self):
        self.store.close()


class PandasHDFStoreBig(PandasHDFStore):
    """Like PandasHDFStore,but keeps a cache of frame numbers.
    This can give a large performance boost when a file contains thousands
    of frames.
    If a file was made in PandasHDFStore,opening it with this class
    and then closing it will add a cache (if mode != 'r').
    Any additional keyword arguments to the constructor are passed to
    pandas.HDFStore().
    """

    def __init__(self,**kwargs):
        self._CACHE_NAME = '_Frames_Cache'
        self._frames_cache = None
        self._cache_dirty = False  # Whether _frames_cache needs to be written out
        super(PandasHDFStoreBig,self).__init__(filename,t_column,**kwargs)

    @property
    def frames(self):
        # Hit memory cache,then disk cache
        if self._frames_cache is not None:
            return self._frames_cache
        else:
            try:
                self._frames_cache = list(self.store[self._CACHE_NAME].index.values)
                self._cache_dirty = False
            except KeyError:
                self._frames_cache = self._get_frame_nos()
                self._cache_dirty = True # In memory,but not in file
            return self._frames_cache

    def put(self,df):
        self._invalidate_cache()
        super(PandasHDFStoreBig,self).put(df)

    def rebuild_cache(self):
        """Delete cache on disk and rebuild it."""
        self._invalidate_cache()
        _ = self.frames # Compute cache
        self._flush_cache()

    def _invalidate_cache(self):
        self._frames_cache = None
        try:
            del self.store[self._CACHE_NAME]
        except KeyError: pass

    def _flush_cache(self):
        """Writes frame cache if dirty and file is writable."""
        if (self._frames_cache is not None and self._cache_dirty
                and self.store.root._v_file._iswritable()):
            self.store[self._CACHE_NAME] = pd.DataFrame({'dummy': 1},index=self._frames_cache)
            self._cache_dirty = False

    def close(self):
        """Updates cache,writes if necessary,then closes file."""
        if self.store.root._v_file._iswritable():
            _ = self.frames # Compute cache
            self._flush_cache()
        super(PandasHDFStoreBig,self).close()


class PandasHDFStoreSingleNode(FramewiseData):
    """An interface to an HDF5 file with framewise access,using pandas,that is faster for cross-frame queries.
    This implementation is more complex than PandasHDFStore,but it simplifies (speeds up?) cross-frame queries,like queries for a single probe's entire trajectory.
    Any additional keyword arguments to the constructor are passed to
    pandas.HDFStore().
    """

    def __init__(self,key='FrameData',use_tabular_copy=False,**kwargs):
        self.filename = os.path.abspath(filename)
        self.key = key
        self._t_column = t_column
        self.store = pd.HDFStore(self.filename,**kwargs)

        with pd.get_store(self.filename) as store:
            try:
                store[self.key]
            except KeyError:
                pass
            else:
                self._validate_node(use_tabular_copy)

    @property
    def t_column(self):
        return self._t_column

    def put(self,df):
        if len(df) == 0:
            warnings.warn('An empty DataFrame was passed to put(). Continuing.')
            return
        self._validate(df)
        self.store.append(self.key,data_columns=True)

    def get(self,frame_no):
        frame = self.store.select(self.key,'{0} == {1}'.format(
            self._t_column,frame_no))
        return frame

    def dump(self,return all frames
        Returns
        -------
        DataFrame
        """
        if N is None:
            return self.store.select(self.key)
        else:
            Nth_frame = self.frames[N - 1]
            return self.store.select(self.key,'{0} <= {1}'.format(
                self._t_column,Nth_frame))

    def close(self):
        self.store.close()

    def __del__(self):
        if hasattr(self,'store'):
            self.close()

    @property
    def frames(self):
        """Returns sorted list of integer frame numbers in file"""
        # I assume one column can fit in memory,which is not ideal.
        # Chunking does not seem to be implemented for select_column.
        frame_nos = self.store.select_column(self.key,self.t_column).unique()
        frame_nos.sort()
        return frame_nos

    def _validate_node(self,use_tabular_copy):
        # The HDFStore might be non-tabular,which means we cannot select a
        # subset,and this whole structure will not work.
        # For convenience,this can rewrite the table into a tabular node.
        if use_tabular_copy:
            self.key = _make_tabular_copy(self.filename,self.key)

        pandas_type = getattr(getattr(getattr(
            self.store._handle.root,self.key,None),'_v_attrs','pandas_type',None)
        if not pandas_type == 'frame_table':
            raise ValueError("This node is not tabular. Call with "
                             "use_tabular_copy=True to proceed.")


def _make_tabular_copy(store,key):
    """Copy the contents nontabular node in a pandas HDFStore
    into a tabular node"""
    tabular_key = key + '/tabular'
    logger.info("Making a tabular copy of %s at %s",(key,tabular_key))
    store.append(tabular_key,store.get(key),data_columns=True)
    return tabular_key

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读