对具有有限并行性的Scala期货进行排序(无需处理ExecutorContexts
发布时间:2020-12-16 18:53:54 所属栏目:安全 来源:网络整理
导读:背景:我有一个功能: def doWork(symbol: String): Future[Unit] 它会启动一些副作用以获取数据并存储它,并在完成时完成Future.但是,后端基础结构具有使用限制,因此可以并行地生成不超过5个这些请求.我有一个N符号列表,我需要通过: var symbols = Array("M
背景:我有一个功能:
def doWork(symbol: String): Future[Unit] 它会启动一些副作用以获取数据并存储它,并在完成时完成Future.但是,后端基础结构具有使用限制,因此可以并行地生成不超过5个这些请求.我有一个N符号列表,我需要通过: var symbols = Array("MSFT",...) 但是我想对它们进行排序,使得不超过5个同时执行.鉴于: val allowableParallelism = 5 我当前的解决方案是(假设我正在使用async / await): val symbolChunks = symbols.toList.grouped(allowableParallelism).toList def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork)) val symbolThunks = symbolChunks.map(toThunk) val done = Promise[Unit]() def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match { case Nil => done.success() case x::xs => x().onComplete(_ => procThunks(xs)) } procThunks(symbolThunks) await { done.future } 但是,由于显而易见的原因,我对此并不十分满意.我觉得这应该是可能的折叠,但每次我尝试,我最终热切地创造期货.我还使用了concatMap尝试了一个带有RxScala Observables的版本,但这看起来也有些过分. 有没有更好的方法来实现这一目标? 解决方法
我有一个示例如何使用scalaz-stream.它是相当多的代码,因为它需要将scala Future转换为scalaz Task(延迟计算的抽象).但是需要将它添加到项目一次.另一个选择是使用Task来定义’doWork’.我个人更喜欢构建异步程序的任务.
import scala.concurrent.{Future => SFuture} import scala.util.Random import scala.concurrent.ExecutionContext.Implicits.global import scalaz.stream._ import scalaz.concurrent._ val P = scalaz.stream.Process val rnd = new Random() def doWork(symbol: String): SFuture[Unit] = SFuture { Thread.sleep(rnd.nextInt(1000)) println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}") } val symbols = Seq("AAPL","MSFT","GOOGL","CVX"). flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}")) implicit class Transformer[+T](fut: => SFuture[T]) { def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { import scala.util.{Failure,Success} import scalaz.syntax.either._ Task.async { register => fut.onComplete { case Success(v) => register(v.right) case Failure(ex) => register(ex.left) } } } } implicit class ConcurrentProcess[O](val process: Process[Task,O]) { def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task,O,O2]): Process[Task,O2] = { val actions = process. zipWith(f)((data,f) => f(data)) val nestedActions = actions.map(P.eval) merge.mergeN(concurrencyLevel)(nestedActions) } } val workChannel = io.channel((s: String) => doWork(s).toTask) val process = Process.emitAll(symbols).concurrently(5)(workChannel) process.run.run 当你在范围内进行所有这些转变时,基本上你只需要: val workChannel = io.channel((s: String) => doWork(s).toTask) val process = Process.emitAll(symbols).concurrently(5)(workChannel) 相当简短和自我描述 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |