加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala Cats效果 – IO异步转换 – 它是如何工作的?

发布时间:2020-12-16 18:13:47 所属栏目:安全 来源:网络整理
导读:这是使用 IO Monad的一些Scala猫代码: import java.util.concurrent.{ExecutorService,Executors}import cats.effect.IOimport scala.concurrent.{ExecutionContext,ExecutionContextExecutor}import scala.util.control.NonFatalobject Program extends Ap
这是使用 IO Monad的一些Scala猫代码:

import java.util.concurrent.{ExecutorService,Executors}

import cats.effect.IO

import scala.concurrent.{ExecutionContext,ExecutionContextExecutor}
import scala.util.control.NonFatal

object Program extends App {

  type CallbackType = (Either[Throwable,Unit]) => Unit

  // IO.async[Unit] is like a Future that returns Unit on completion.
  // Unlike a regular Future,it doesn't start to run until unsafeRunSync is called.
  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    // "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing,which is of type Unit.
        try {
          callback(Right(nothing)) // On success,the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure,it returns an exception
        }
      }
    })
  }

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

  val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
  val mainThread: Thread = Thread.currentThread()

  val Global: ExecutionContextExecutor = ExecutionContext.global

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
   */
  val program = for {
    _ <- IO {
      println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
    }
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- forkSync { () =>
      println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- forkAsync { () =>
      println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- IO {
      println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
    }
  } yield ()

  program.unsafeRunSync()
}

要运行它,您需要添加:

libraryDependencies ++= Seq(
  "org.typelevel" %% "cats" % "0.9.0","org.typelevel" %% "cats-effect" % "0.3"
),

到你的build.sbt文件.

注意输出:

/*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
 */

基本上,我不明白IO.shift(全局)或IO.async如何工作.

例如,为什么在我调用“forkAsync”之后,如果我不调用“IO.shift(Global)”,则后续的同步IO对象将在“pool-1-thread-1”中运行.另外,在这个例子中,forkAsync和forkSync有什么区别?它们都在ExecutionContext.global中启动,然后在“pool.pool-1-thread-1”中执行Runnable.

就像forkAsync和forkSync做同样的事情或者forkAsync做一些不同的事情?如果他们正在做同样的事情,那么在IO.async中包装代码有什么意义呢?如果他们没有做同样的事情,他们有什么不同?

解决方法

For example,why is it that after I call “forkAsync”,if I don’t call “IO.shift(Global)”,the subsequent synchronous IO objects are run in “pool-1-thread-1”.

更重要的问题是为什么你期望它评估全球的“后续同步IO对象”?

IO内部没有线程池的概念,它不了解全局,因此它无法转移回默认的线程池,因此您需要触发手动转换.

升级到最新版本1.0.0,你还在ContextShift中有evalOn,它将在指定的线程池上执行IO操作,然后切换回你的“全局”,我想这就是你想要的.

Also,what is the difference between forkAsync and forkSync in this example?

forkSync触发Runnable的执行,但不等待其完成.这是一场火灾,忘记了.这意味着后续的链式操作不会进行反压.

一些忠告:

>升级到最新版本(1.0.0)
>阅读文档:https://typelevel.org/cats-effect/datatypes/io.html

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读