加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 关于Future.firstCompletedOf和Garbage Collect机制

发布时间:2020-12-16 08:59:07 所属栏目:安全 来源:网络整理
导读:我在我的真实项目中遇到过这个问题,并通过我的测试代码和分析器证明了这一点.我没有粘贴“tl; dr”代码,而是向您展示一张图片,然后对其进行描述. 简单地说,我正在使用Future.firstCompletedOf来获得2期货的结果,两者都没有共享的东西而且彼此不关心.即使这是
我在我的真实项目中遇到过这个问题,并通过我的测试代码和分析器证明了这一点.我没有粘贴“tl; dr”代码,而是向您展示一张图片,然后对其进行描述.

enter image description here

简单地说,我正在使用Future.firstCompletedOf来获得2期货的结果,两者都没有共享的东西而且彼此不关心.即使这是我想要解决的问题,垃圾收集器也无法回收第一个Result对象,直到两个Futures完成.

所以我真的很好奇这背后的机制.有人可以从较低的层面解释它,或提供一些提示让我调查.

谢谢!

PS:是因为他们共享相同的ExecutionContext吗?

**根据要求更新**粘贴测试代码

object Main extends App{
  println("Test start")

  val timeout = 30000

  trait Result {
    val id: Int
    val str = "I'm short"
  }
  class BigObject(val id: Int) extends Result{
    override val str = "really big str"
  }

  def guardian = Future({
    Thread.sleep(timeout)
    new Result { val id = 99999 }
  })

  def worker(i: Int) = Future({
    Thread.sleep(100)
    new BigObject(i)
  })

  for (i <- Range(1,1000)){
    println("round " + i)
    Thread.sleep(20)
    Future.firstCompletedOf(Seq(
      guardian,worker(i)
    )).map( r => println("result" + r.id))
  }

  while (true){
    Thread.sleep(2000)
  }
}

解决方法

让我们看看firstCompletedOf是如何实现的:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val completeFirst: Try[T] => Unit = p tryComplete _
  futures foreach { _ onComplete completeFirst }
  p.future
}

在进行{期货foreach {_ onComplete completeFirst}时,函数{_ onComplete completeFirst}被保存在某个地方
通过ExecutionContext.execute.保存此函数的确切位置无关紧要,我们只知道它必须保存在某个地方
这样当线程可用时,它可以在以后被选中并在线程池上执行.

该函数关闭在关闭p的completeFirst上.
因此,只要还有一个未来(来自期货)等待完成,就会引用p来防止它被垃圾收集(即使在那一点上,机会是firstCompletedOf已经返回,从堆栈中删除p ).

当第一个未来完成时,它会将结果保存到promise中(通过调用p.tryComplete).
因为承诺p保持结果,所以结果至少在p是可达的时间内是可达的,并且正如我们所看到的那样,只要期货的至少一个未来尚未完成,p就可以到达.
这就是为什么在所有期货完成之前无法收集结果的原因.

更新:
现在的问题是:它可以修复吗?我认为可以.我们所要做的就是确保第一个未来以线程安全的方式完成对空的引用“空出”,这可以通过使用AtomicReference的示例来完成.像这样的东西:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val pref = new java.util.concurrent.atomic.AtomicReference(p)
  val completeFirst: Try[T] => Unit = { result: Try[T] =>
    val promise = pref.getAndSet(null)
    if (promise != null) {
      promise.tryComplete(result)
    }
  }
  futures foreach { _ onComplete completeFirst }
  p.future
}

我已经对它进行了测试,并且正如预期的那样,它确实允许在第一个未来完成后立即对结果进行垃圾收集.它应该在所有其他方面表现相同.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读