scala – Cats-effect和异步IO细节
几天来,我一直在围绕猫效应和IO.而且我觉得我对这种效果有一些误解,或者我只是错过了它的观点.
>首先 – 如果IO可以取代Scala的Future,我们如何创建异步IO任务?使用IO.shift?使用IO.async? IO.delay是同步还是异步?我们可以使用像Async [F] .delay(…)这样的代码进行通用异步任务吗?或者当我们使用unsafeToAsync或unsafeToFuture调用IO时发生异步? 我很感激有些澄清任何一个,因为我没有理解猫效应文档对那些和互联网没有帮助… 解决方法
首先,我们需要澄清什么是异步任务.通常async意味着“不会阻止操作系统线程”,但由于你提到的是Future,它有点模糊.说,如果我写道: Future { (1 to 1000000).foreach(println) } 它不会是异步的,因为它是一个阻塞循环和阻塞输出,但它可能会在不同的OS线程上执行,由隐式ExecutionContext管理.等效的猫效应代码将是: for { _ <- IO.shift _ <- IO.delay { (1 to 1000000).foreach(println) } } yield () (这不是较短的版本) 所以, > IO.shift用于更改线程/线程池.未来会在每次操作中都做到,但它不是免费的性能. 现在,让我们回到真正的异步.这里要理解的是: 每个异步计算都可以表示为一个回调函数. 无论您使用返回Future或Java的CompletableFuture的API,还是NIO CompletionHandler之类的东西,它都可以转换为回调.这就是IO.async的用途:您可以将任何回调函数转换为IO.如果像: for { _ <- IO.async { ... } _ <- IO(println("Done")) } yield () 只有当(并且如果)…中的计算回调时,才会打印完成.您可以将其视为阻止绿色线程,但不是OS线程. 所以, > IO.async用于将任何已经异步的计算转换为IO. 与Futures合作时最接近的类比是创建一个scala.concurrent.Promise并返回p.future.
有点.使用IO,除非您调用其中一个(或使用IOApp),否则不会发生任何事情.但IO不保证您将在不同的OS线程上执行,甚至不能保证异步执行,除非您使用IO.shift或IO.async明确要求这样做. 您可以随时保证线程切换,例如(IO.shift *> myIO).unsafeRunAsyncAndForget().这可能是因为myIO在被要求之前不会被执行,无论你是将它作为val myIO还是def myIO. 但是,您无法将阻塞操作神奇地转换为非阻塞操作.这对Future和IO都不可能.
Async和Concurrent(和Sync)是类型类.它们的设计使得程序员可以避免被锁定到cats.effect.IO并且可以为您提供支持您选择的任何API,例如monix Task或Scalaz 8 ZIO,甚至是monad转换器类型,例如OptionT [Task,* something *].像fs2,monix和http4s这样的库利用它们为你提供了更多选择. Concurrent在Async之上添加了额外的东西,其中最重要的是.cancelable和.start.这些与Future没有直接比喻,因为它根本不支持取消. .cancelable是.async的一个版本,它允许您指定一些逻辑来取消正在包装的操作.一个常见的例子是网络请求 – 如果您对结果不再感兴趣,您可以在不等待服务器响应的情况下中止它们,并且不会浪费任何套接字或处理读取响应的时间.你可能永远不会直接使用它,但它有它的位置. 但是如果你不能取消它们,那么可取消的操作有什么用呢?这里的关键观察是你无法从内部取消操作.其他人必须做出这个决定,这将与操作本身同时发生(这是类型类得名的地方).这就是.start的用武之地.简而言之, .start是绿色线程的显式分支. 做一些IO.start类似于做val t = new Thread(someRunnable); t.start(),现在它除了绿色.而Fiber本质上是Thread API的精简版本:你可以做.join,就像Thread#join(),但它不会阻止OS线程;和.cancel,它是.interrupt()的安全版本. 请注意,还有其他方法来分叉绿色线程.例如,执行并行操作: val ids: List[Int] = List.range(1,1000) def processId(id: Int): IO[Unit] = ??? val processAll: IO[Unit] = ids.parTraverse_(processId) 将所有ID分叉处理为绿色线程,然后将它们全部加入.或者使用.race: val fetchFromS3: IO[String] = ??? val fetchFromOtherNode: IO[String] = ??? val fetchWhateverIsFaster = IO.race(fetchFromS3,fetchFromOtherNode).map(_.merge) 将并行执行提取,为您提供第一个结果并自动取消较慢的提取.因此,执行.start和使用Fiber并不是分叉更多绿色线程的唯一方法,只是最明确的线程.答案是:
> IO就像一个绿色线程,这意味着你可以在没有OS线程开销的情况下并行运行大量的线程,并且for-comprehension中的代码就像阻塞计算结果一样.> Fiber是一种用于控制绿色线程显式分叉(等待完成或取消)的工具. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |