加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Future.traverse似乎按顺序工作而不是并行工作.这是真

发布时间:2020-12-16 18:21:07 所属栏目:安全 来源:网络整理
导读:我的问题很简单,关于Future.traverse方法. 所以我有一个String-s列表.每个字符串都是网页的URL.然后我有一个类可以获取URL,加载网页并解析一些数据.所有这些都包含在Future {}中,因此异步处理结果. 该类简化如下: class RatingRetriever(context:ExecutionC
我的问题很简单,关于Future.traverse方法.
所以我有一个String-s列表.每个字符串都是网页的URL.然后我有一个类可以获取URL,加载网页并解析一些数据.所有这些都包含在Future {}中,因此异步处理结果.

该类简化如下:

class RatingRetriever(context:ExecutionContext) {
  def resolveFilmToRating(url:String):Future[Option[Double]]={
    Future{
      //here it creates Selenium web driver,loads the url and parses it.
    }(context)
  }
}

然后在另一个对象中我有这个:

implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
    .......
    val links:List[String] = films.map(film => film.asInstanceOf[WebElement].getAttribute("href"))
    val ratings: Future[List[Option[Double]]] = Future.traverse(links)(link => new RatingRetriever(executionContext).resolveFilmToRating(link))

当它工作时我绝对可以看到它按顺序进行收集.如果我将执行上下文从固定大小池更改为单线程池,则行为是相同的.
所以我真的很想知道如何让Future.traverse并行工作.你能建议吗?

解决方法

看看traverse的资料来源:

in.foldLeft(successful(cbf(in))) { (fr,a) => //we sequentially traverse Collection
  val fb = fn(a)                        //Your function comes here
  for (r <- fr; b <- fb) yield (r += b) //Just add elem to builder
}.map(_.result())                       //Getting the collection from builder

那么代码的并行程度取决于你的函数fn,看看两个例子:

1)此代码:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object FutureTraverse extends App{
  def log(s: String) = println(s"${Thread.currentThread.getName}: $s")

  def withDelay(i: Int) = Future{
    log(s"withDelay($i)")
    Thread.sleep(1000)
    i
  }

  val seq = 0 to 10

  Future {
    for(i <- 0 to 5){
      log(".")
      Thread.sleep(1000)
    }
  }

  val resultSeq = Future.traverse(seq)(withDelay(_))

  Thread.sleep(6000)
}

有这样的输出:

ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(0)
ForkJoinPool-1-worker-1: withDelay(1)
ForkJoinPool-1-worker-7: withDelay(2)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(3)
ForkJoinPool-1-worker-1: withDelay(4)
ForkJoinPool-1-worker-7: withDelay(5)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(6)
ForkJoinPool-1-worker-1: withDelay(7)
ForkJoinPool-1-worker-7: withDelay(8)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(9)
ForkJoinPool-1-worker-1: withDelay(10)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-5: .

2)只需更改withDelay函数:

def withDelay(i: Int) = {
    Thread.sleep(1000)
    Future {
      log(s"withDelay($i)")
      i
    }
  }

你会得到一个顺序输出:

ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-5: withDelay(0)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(1)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(2)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(3)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(4)
ForkJoinPool-1-worker-7: withDelay(5)
ForkJoinPool-1-worker-1: withDelay(6)
ForkJoinPool-1-worker-1: withDelay(7)
ForkJoinPool-1-worker-7: withDelay(8)
ForkJoinPool-1-worker-7: withDelay(9)
ForkJoinPool-1-worker-7: withDelay(10)

所以Future.traverse不一定是并行的,它只是提交任务,它可以按顺序执行,整个并行的东西都在你提交的函数中.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读