加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python – numpy的矢量化基数排序 – 它可以击败np.sort吗?

发布时间:2020-12-16 22:53:54 所属栏目:Python 来源:网络整理
导读:Numpy没有yet有一个基数排序,所以我想知道是否有可能使用预先存在的numpy函数编写一个.到目前为止,我有以下,它确实有效,但比numpy的快速排序慢约10倍. 测试和基准测试: a = np.random.randint(0,1e8,1e6)assert(np.all(radix_sort(a) == np.sort(a))) %time

Numpy没有yet有一个基数排序,所以我想知道是否有可能使用预先存在的numpy函数编写一个.到目前为止,我有以下,它确实有效,但比numpy的快速排序慢约10倍.

line profiler output

测试和基准测试:

a = np.random.randint(0,1e8,1e6)
assert(np.all(radix_sort(a) == np.sort(a))) 
%timeit np.sort(a)
%timeit radix_sort(a)

mask_b循环可以至少部分地被矢量化,在来自&的掩模之间广播,并且使用具有轴arg的cumsum,但是这最终是悲观化,可能是由于增加的存储器占用.

如果有人能够看到一种方法来改进我所拥有的东西,我会有兴趣听到,即使它仍然比np.sort慢……这更像是一种求知欲和对numpy技巧的兴趣.

请注意,can implement快速计数排序很容易,但这只与小整数数据有关.

编辑1:从循环中取出np.arange(n)会有所帮助,但这并不是非常令人兴奋.

编辑2:cumsum实际上是多余的(ooops!)但是这个更简单的版本只对性能有所帮助.

def radix_sort(a):
    bit_len = np.max(a).bit_length()
    n = len(a)
    cached_arange = arange(n)
    idx = np.empty(n,dtype=int) # fully overwritten each iteration
    for mask_b in xrange(bit_len):
        is_one = (a & 2**mask_b).astype(bool)
        n_ones = np.sum(is_one)      
        n_zeros = n-n_ones
        idx[~is_one] = cached_arange[:n_zeros]
        idx[is_one] = cached_arange[:n_ones] + n_zeros
        # next three lines just do: a[idx] = a,but correctly
        new_a = np.empty(n,dtype=a.dtype)
        new_a[idx] = a
        a = new_a
    return a

编辑3:如果您在多个步骤中构造idx,则可以一次循环两个或更多个,而不是循环使用单个位.使用2位有点帮助,我没有尝试过更多:

idx[is_zero] = np.arange(n_zeros)
idx[is_one] = np.arange(n_ones)
idx[is_two] = np.arange(n_twos)
idx[is_three] = np.arange(n_threes)

编辑4和5:对于我正在测试的输入,4位似乎是最好的.此外,你可以完全摆脱idx步骤.现在只比np.sort(source available as gist)慢5倍,而不是10倍:

enter image description here

编辑6:这是上面的一个整理版本,但它也有点慢. 80%的时间用于重复和提取 – 如果只有一种方法来广播提取物:( …

def radix_sort(a,batch_m_bits=3):
    bit_len = np.max(a).bit_length()
    batch_m = 2**batch_m_bits
    mask = 2**batch_m_bits - 1
    val_set = np.arange(batch_m,dtype=a.dtype)[:,nax] # nax = np.newaxis
    for _ in range((bit_len-1)//batch_m_bits + 1): # ceil-division
        a = np.extract((a & mask)[nax,:] == val_set,np.repeat(a[nax,:],batch_m,axis=0))
        val_set <<= batch_m_bits
        mask <<= batch_m_bits
    return a

编辑7& 8:实际上,您可以使用asprtrided从numpy.lib.stride_tricks广播提取,但它似乎没有太大的性能帮助:

enter image description here

最初这对我有意义,理由是提取将遍历整个数组batch_m次,因此CPU请求的高速缓存行总数将与之前相同(只是在它具有的过程结束时)请求每个缓存行batch_m次).然而,the reality是提取不够聪明,无法迭代任意阶梯数组,并且必须在开始之前扩展数组,即无论如何最终都要完成重复.
事实上,在查看了提取源之后,我现在看到我们用这种方法做的最好的事情是:

a = a[np.flatnonzero((a & mask)[nax,:] == val_set) % len(a)]

这比提取略慢.但是,如果len(a)是2的幂,我们可以用&更换昂贵的mod操作. (len(a) – 1),它最终比提取版本快一点(现在大约4.9x np.sort为a = randint(0,2 ** 20).我想我们可以做到这一点通过零填充来处理两个长度的非幂,然后在排序结束时裁剪额外的零…然而,除非长度已经接近2的幂,否则这将是一个悲观.

最佳答案
我和Numba一起去看看基数排序的速度有多快. Numba(通常)表现良好的关键是写出所有循环,这非常有启发性.我最终得到了以下内容:

from numba import jit

@jit
def radix_loop(nbatches,batch_m_bits,bitsums,a,out):
    mask = (1 << batch_m_bits) - 1
    for shift in range(0,nbatches*batch_m_bits,batch_m_bits):
        # set bit sums to zero
        for i in range(bitsums.shape[0]):
            bitsums[i] = 0

        # determine bit sums
        for i in range(a.shape[0]):
            j = (a[i] & mask) >> shift
            bitsums[j] += 1

        # take the cumsum of the bit sums
        cumsum = 0
        for i in range(bitsums.shape[0]):
            temp = bitsums[i]
            bitsums[i] = cumsum
            cumsum += temp

        # sorting loop
        for i in range(a.shape[0]):
            j = (a[i] & mask) >> shift
            out[bitsums[j]] = a[i]
            bitsums[j] += 1

        # prepare next iteration
        mask <<= batch_m_bits
        # cant use `temp` here because of numba internal types
        temp2 = a
        a = out
        out = temp2

    return a

从4个内圈开始,很容易看出它是第4个,因此很难用Numpy进行矢量化.

欺骗这个问题的一种方法是从Scipy:scipy.sparse.coo.coo_tocsr中引入一个特定的C函数.它与上面的Python函数几乎完全相同的内部循环,因此可以滥用在Python中编写更快的“矢量化”基数排序.也许是这样的:

from scipy.sparse.coo import coo_tocsr

def radix_step(radix,keys,w):
    coo_tocsr(radix,1,a.size,w,w)
    return w,a

def scipysparse_radix_perbyte(a):
    # coo_tocsr internally works with system int and upcasts
    # anything else. We need to copy anyway to not mess with
    # original array. Also take into account endianness...
    a = a.astype('

编辑:稍微优化一下功能..查看编辑历史记录.

如上所述的LSB基数排序的一个低效率是阵列在RAM中完全洗牌多次,这意味着CPU缓存使用得不是很好.为了尝试减轻这种影响,可以选择先使用MSB基数排序进行传递,将项目放在大致正确的RAM块中,然后使用LSB基数排序对每个结果组进行排序.这是一个实现:

def scipysparse_radix_hybrid(a,bbits=8,gbits=8):
    """
    Parameters
    ----------
    a : Array of non-negative integers to be sorted.
    bbits : Number of bits in radix for LSB sorting.
    gbits : Number of bits in radix for MSB grouping.
    """
    a = a.copy()
    bitlen = int(a.max()).bit_length()
    work = np.empty_like(a)

    # Group values by single iteration of MSB radix sort:
    # Casting to np.int_ to get rid of python BigInt
    ngroups = np.int_(2**gbits)
    group_offset = np.empty(ngroups + 1,int)
    shift = max(bitlen-gbits,0)
    a,work = radix_step(ngroups,a>>shift,group_offset,work)
    bitlen = shift
    if not bitlen:
        return a

    # LSB radix sort each group:
    agroups = np.split(a,group_offset[1:-1])
    # Mask off high bits to not undo the grouping..
    gmask = (1 << shift) - 1
    nbatch = (bitlen-1) // bbits + 1
    radix = np.int_(2**bbits)
    _ = np.empty(radix + 1,int)
    for agi in agroups:
        if not agi.size:
            continue
        mask = (radix - 1) & gmask
        wgi = work[:agi.size]
        for shift in range(0,nbatch*bbits,bbits):
            keys = (agi & mask) >> shift
            agi,wgi = radix_step(radix,agi,wgi)
            mask = (mask << bbits) & gmask
        if nbatch % 2:
            # Copy result back in to `a`
            wgi[...] = agi
    return a

计时(在我的系统上为每个设置提供最佳性能):

def numba_radix(a,batch_m_bits=8):
    a = a.copy()
    bit_len = int(a.max()).bit_length()
    nbatches = (bit_len-1)//batch_m_bits +1
    work = np.zeros_like(a)
    bitsums = np.zeros(2**batch_m_bits + 1,int)
    srtd = radix_loop(nbatches,work)
    return srtd

a = np.random.randint(0,1e6)
%timeit numba_radix(a,9)
# 10 loops,best of 3: 76.1 ms per loop
%timeit np.sort(a)
#10 loops,best of 3: 115 ms per loop
%timeit scipysparse_radix_perbyte(a)
#10 loops,best of 3: 95.2 ms per loop
%timeit scipysparse_radix_hybrid(a,11,6)
#10 loops,best of 3: 75.4 ms per loop

正如预期的那样,Numba的表现非常出色.而且通过一些巧妙应用现有的C-extension,可以击败numpy.sort. IMO在优化级别你已经得到了它的价值 – 它也考虑了Numpy的附加组件,但我不会真正考虑我的答案中的实现“矢量化”:大部分工作是在外部完成的专用功能.

令我印象深刻的另一件事是对基数选择的敏感性.对于我尝试的大多数设置,我的实现仍然比numpy.sort慢,所以在实践中需要某种启发式方法来提供全面的性能.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读