如何控制scala中future.sequence的并发性?
发布时间:2020-12-16 18:06:59 所属栏目:安全 来源:网络整理
导读:我知道我可以将Seq [Future [T]]转换为Future [Seq [T]] via val seqFuture = Future.sequence(seqOfFutures) seqFuture.map((seqT: Seq[T]) = {...}) 我现在的问题是,我在那个序列中有700个期货,我希望能够控制它们中有多少并行解决,因为每个未来将调用内部
我知道我可以将Seq [Future [T]]转换为Future [Seq [T]] via
val seqFuture = Future.sequence(seqOfFutures) seqFuture.map((seqT: Seq[T]) => {...}) 我现在的问题是,我在那个序列中有700个期货,我希望能够控制它们中有多少并行解决,因为每个未来将调用内部休息api,并且同时有700个请求就像开火一样针对该服务器的dos攻击. 我宁愿一次只解决10个期货的问题. 我怎样才能做到这一点? 尝试pamu’s answer我看到错误: [error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters [error] val batch = Future.sequence(c.map(_())) [error] ^ [error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]],implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing]) [error] --- because --- [error] argument expression's type is not compatible with formal parameter type; [error] found : List[Nothing] [error] required: ?M[scala.concurrent.Future[?A]] [error] val batch = Future.sequence(c.map(_())) [error] ^ [error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch; [error] found : List[Nothing] [error] required: M[scala.concurrent.Future[A]] [error] val batch = Future.sequence(c.map(_())) [error] ^ [error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]]. [error] val batch = Future.sequence(c.map(_())) [error] ^ [error] four errors found 解决方法
FoldLeft
简单的foldLeft可用于控制一次同时运行的期货数量. 首先,让我们创建一个名为LazyFuture的案例类 case class LazyFuture[+A](f: Unit => Future[A]) { def apply() = f() } object LazyFuture { def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f)) def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f) } LazyFuture阻止未来立即运行 val list: List[LazyFuture[A]] = ... list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r,c) => val batch = Future.sequence(c.map(_())) batch.flatMap(values => r.map(rs => rs ++ values)) } 相应地更改concurFactor以同时运行多个期货. concurFactor of 1将同时运行一个未来 concurFactor of 2将同时运行两个期货 等等 … def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) = list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r,c) => val batch = Future.sequence(c.map(_())) r.flatMap(rs => batch.map(values => rs ++ values)) } 完整的代码 case class LazyFuture[+A](f: Unit => Future[A]) { def apply() = f() } object LazyFuture { def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f)) def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f) } def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] = list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r,c) => val batch = Future.sequence(c.map(_ ())) r.flatMap(rs => batch.map(values => rs ++ values)) } 限制执行上下文 您还可以通过限制执行池中的线程数来限制计算资源.但是,这种解决方案并不灵活.就个人而言,我不喜欢它. val context: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8)) 您必须记住传递正确的执行上下文,这是一个隐式值.有时我们不知道哪个隐含在范围内.这是马车 警告 当未来如下构建时 val foo = Future { 1 + 2 } // future starts executing LazyFuture(foo) // Not a right way foo已经开始执行,无法控制. 构建LazyFuture的正确方法 val foo = LazyFuture { 1 + 2 } 要么 val foo = LazyFuture { Future { 1 + 2 } } 工作实例 package main import scala.concurrent.{Await,ExecutionContext,Future} import scala.concurrent.duration.Duration object Main { case class LazyFuture[A](f: Unit => Future[A]) { def apply(): Future[A] = f() } object LazyFuture { def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f)) def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f) } def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) (implicit ec: ExecutionContext): Future[List[A]] = list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r,c) => val batch = Future.sequence(c.map(_ ())) r.flatMap(rs => r.map(values=> rs ++ values)) } def main(args: Array[String]): Unit = { import scala.concurrent.ExecutionContext.Implicits.global val futures: Seq[LazyFuture[Int]] = List(1,2,3,4,5).map { value => LazyFuture { println(s"value: $value started") Thread.sleep(value * 200) println(s"value: $value stopped") value } } val f = executeBatch(futures.toList)(2) Await.result(f,Duration.Inf) } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |