scala – 在akka-stream中如何从期货集合中创建无序来源
发布时间:2020-12-16 09:00:11 所属栏目:安全 来源:网络整理
导读:我需要从Future [T]的集合中创建一个akka.stream. scaladsl.Source [T,Unit]. 例如,有一组期货返回整数, val f1: Future[Int] = ???val f2: Future[Int] = ???val fN: Future[Int] = ???val futures = List(f1,f2,fN) 如何创建一个 val source: Source[Int,U
我需要从Future [T]的集合中创建一个akka.stream.
scaladsl.Source [T,Unit].
例如,有一组期货返回整数, val f1: Future[Int] = ??? val f2: Future[Int] = ??? val fN: Future[Int] = ??? val futures = List(f1,f2,fN) 如何创建一个 val source: Source[Int,Unit] = ??? 从中. 我不能使用Future.sequence组合器,因为那时我会等待每个未来完成之后从源获取任何东西.我想在任何未来完成后立即以任何顺序获得结果. 我知道Source是一个纯粹的功能API,在以某种方式实现之前它不应该运行任何东西.所以,我的想法是使用Iterator(懒惰)来创建一个源: Source { () => new Iterator[Future[Int]] { override def hasNext: Boolean = ??? override def next(): Future[Int] = ??? } } 但这将是未来的来源,而不是实际价值.我也可以在下一次使用Await.result(未来)阻止,但我不确定哪个胎面池的线程将被阻止.这也将顺序调用期货,而我需要并行执行. 更新2:事实证明有一个更简单的方法(感谢Viktor Klang): Source(futures).mapAsync(1)(identity) 更新:这是基于@sschaef回答我得到的: def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T,Unit] = { def run(actor: ActorRef): Unit = { futures.foreach { future => future.onComplete { case Success(value) => actor ! value case Failure(NonFatal(t)) => actor ! Status.Failure(t) // to signal error } } Future.sequence(futures).onSuccess { case _ => actor ! Status.Success(()) // to signal stream's end } } Source.actorRef[T](futures.size,OverflowStrategy.fail).mapMaterializedValue(run) } // ScalaTest tests follow import scala.concurrent.ExecutionContext.Implicits.global implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() "futuresToSource" should "convert futures collection to akka-stream source" in { val f1 = Future(1) val f2 = Future(2) val f3 = Future(3) whenReady { futuresToSource(List(f1,f3)).runFold(Seq.empty[Int])(_ :+ _) } { results => results should contain theSameElementsAs Seq(1,2,3) } } it should "fail on future failure" in { val f1 = Future(1) val f2 = Future(2) val f3 = Future.failed(new RuntimeException("future failed")) whenReady { futuresToSource(List(f1,f3)).runWith(Sink.ignore).failed } { t => t shouldBe a [RuntimeException] t should have message "future failed" } } 解决方法
创建Futures源,然后通过mapAsync“展平”它:
scala> Source(List(f1,fN)).mapAsync(1)(identity) res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |