加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

对具有有限并行性的Scala期货进行排序(无需处理ExecutorContexts

发布时间:2020-12-16 18:53:54 所属栏目:安全 来源:网络整理
导读:背景:我有一个功能: def doWork(symbol: String): Future[Unit] 它会启动一些副作用以获取数据并存储它,并在完成时完成Future.但是,后端基础结构具有使用限制,因此可以并行地生成不超过5个这些请求.我有一个N符号列表,我需要通过: var symbols = Array("M
背景:我有一个功能:

def doWork(symbol: String): Future[Unit]

它会启动一些副作用以获取数据并存储它,并在完成时完成Future.但是,后端基础结构具有使用限制,因此可以并行地生成不超过5个这些请求.我有一个N符号列表,我需要通过:

var symbols = Array("MSFT",...)

但是我想对它们进行排序,使得不超过5个同时执行.鉴于:

val allowableParallelism = 5

我当前的解决方案是(假设我正在使用async / await):

val symbolChunks = symbols.toList.grouped(allowableParallelism).toList
  def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork))
  val symbolThunks = symbolChunks.map(toThunk)
  val done = Promise[Unit]()
  def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match {
    case Nil => done.success()
    case x::xs => x().onComplete(_ => procThunks(xs))
  }
  procThunks(symbolThunks)
  await { done.future }

但是,由于显而易见的原因,我对此并不十分满意.我觉得这应该是可能的折叠,但每次我尝试,我最终热切地创造期货.我还使用了concatMap尝试了一个带有RxScala Observables的版本,但这看起来也有些过分.

有没有更好的方法来实现这一目标?

解决方法

我有一个示例如何使用scalaz-stream.它是相当多的代码,因为它需要将scala Future转换为scalaz Task(延迟计算的抽象).但是需要将它添加到项目一次.另一个选择是使用Task来定义’doWork’.我个人更喜欢构建异步程序的任务.

import scala.concurrent.{Future => SFuture}
  import scala.util.Random
  import scala.concurrent.ExecutionContext.Implicits.global


  import scalaz.stream._
  import scalaz.concurrent._

  val P = scalaz.stream.Process

  val rnd = new Random()

  def doWork(symbol: String): SFuture[Unit] = SFuture {
    Thread.sleep(rnd.nextInt(1000))
    println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}")
  }

  val symbols = Seq("AAPL","MSFT","GOOGL","CVX").
    flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}"))

  implicit class Transformer[+T](fut: => SFuture[T]) {
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
      import scala.util.{Failure,Success}
      import scalaz.syntax.either._
      Task.async {
        register =>
          fut.onComplete {
            case Success(v) => register(v.right)
            case Failure(ex) => register(ex.left)
          }
      }
    }
  }

  implicit class ConcurrentProcess[O](val process: Process[Task,O]) {
    def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task,O,O2]): Process[Task,O2] = {
      val actions =
        process.
          zipWith(f)((data,f) => f(data))

      val nestedActions =
        actions.map(P.eval)

      merge.mergeN(concurrencyLevel)(nestedActions)
    }
  }

  val workChannel = io.channel((s: String) => doWork(s).toTask)

  val process = Process.emitAll(symbols).concurrently(5)(workChannel)

  process.run.run

当你在范围内进行所有这些转变时,基本上你只需要:

val workChannel = io.channel((s: String) => doWork(s).toTask)

  val process = Process.emitAll(symbols).concurrently(5)(workChannel)

相当简短和自我描述

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读