加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

过滤Scala的并行集合,并在发现所需的结果数量时提前中止

发布时间:2020-12-16 19:02:05 所属栏目:安全 来源:网络整理
导读:给定一个非常大的collection.parallel.mutable.ParHashMap(或任何其他并行集合)的实例,一旦找到一个给定的,比如50个匹配项,怎么能中止过滤并行扫描? 尝试在线程安全的“外部”数据结构中累积中间匹配或保留具有结果计数的外部AtomicInteger在4个内核上的使
给定一个非常大的collection.parallel.mutable.ParHashMap(或任何其他并行集合)的实例,一旦找到一个给定的,比如50个匹配项,怎么能中止过滤并行扫描?

尝试在线程安全的“外部”数据结构中累积中间匹配或保留具有结果计数的外部AtomicInteger在4个内核上的使用速度似乎比使用常规collection.mutable.HashMap慢2到3倍,并将单个内核与100 %.

我知道在Par *集合上查找或存在会在内部中止.有没有一种方法可以推广这个来找到多个结果?

这里的代码似乎比ParHashMap的速度慢了2到3倍,而且还有一个问题,比较多的maxResults结果进入结果CHM(这可能是因为线程被preemptAndGet抢占之前但之前的允许其他线程在)中添加更多的元素.更新:似乎减速是由于在counter.incrementAndGet()上的工作线程竞争,这当然违反了整个并行扫描的目的:-(

def find(filter: Node => Boolean,maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key,node) <- parHashMap if filter(node))
    {
      results.put(key,node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}

解决方法

我将首先做并行扫描,其中maxResults变量是threadlocal.这将找到(maxResults * numberOfThreads)结果.

然后我将进行单线程扫描,将其减少到maxResults.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读