在REPL中使用Scala中的java.util.concurrent._进行死锁
在研究Paul Chiusano和Runar Bjanarson的着作“
Scala中的函数式编程”时,我遇到了以下场景(第7章 – 纯函数并行).
package fpinscala.parallelism import java.util.concurrent._ import language.implicitConversions object Par { type Par[A] = ExecutorService => Future[A] def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s) def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`,which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it. private case class UnitFuture[A](get: A) extends Future[A] { def isDone = true def get(timeout: Long,units: TimeUnit) = get def isCancelled = false def cancel(evenIfRunning: Boolean): Boolean = false } def map2[A,B,C](a: Par[A],b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread,in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread. (es: ExecutorService) => { val af = a(es) val bf = b(es) UnitFuture(f(af.get,bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values,waits for the results of the Futures `af` and `bf`,applies `f` to them,and wraps them in a `UnitFuture`. In order to respect timeouts,we'd need a new `Future` implementation that records the amount of time spent evaluating `af`,then subtracts that time from the available time allocated for evaluating `bf`. } def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`,but there are some problems with it--for one,the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool,or whatever resource backs the `ExecutorService`,this implies that we're losing out on some potential parallelism. Essentially,we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation,and we will discuss this later in the chapter. es => es.submit(new Callable[A] { def call = a(es).get }) def lazyUnit[A](a: => A): Par[A] = fork(unit(a)) def equal[A](e: ExecutorService)(p: Par[A],p2: Par[A]): Boolean = p(e).get == p2(e).get } 您可以在Github here上找到原始代码.有关java.util.concurrent文档,请参阅here. 我关心fork的实现.特别是,当ThreadPool太小时,据称fork会导致死锁. 我考虑以下示例: val a = Par.lazyUnit(42 + 1) val es: ExecutorService = Executors.newFixedThreadPool(2) println(Par.fork(a)(es).get) 我不希望这个例子最终陷入死锁,因为有两个线程.然而,当我在Scala REPL中运行它时,它会在我的计算机上运行.为什么会这样? 初始化ExecutorService时的输出是 java.util.concurrent.ThreadPoolE xecutor@73a86d72[Running,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 0] 池大小= 0是否正确?换句话说,这是一个不了解java.util.concurrent._的问题,还是不了解Scala部分的问题? 解决方法
好的,经过长时间的调查,我相信我有一个答案.完整的故事很长,但我会尝试通过简化和避免许多细节来缩短它.
注意:潜在的Scala可以编译到各种不同的目标平台,但是这个特定问题在Java / JVM上作为目标发生,所以这就是这个答案的内容. 您看到的死锁与线程池的大小无关.实际上是挂起的外叉调用.它与REPL实现细节和多线程的组合有关,但需要学习一些内容才能理解它是如何发生的: > Scala REPL的工作原理 一个简短的(呃)版本(参见最后的摘要)是这个代码在REPL下挂起,因为当它由REPL执行时,它在逻辑上类似于以下代码: object DeadLock { import scala.concurrent._ import scala.concurrent.duration.Duration import scala.concurrent.ExecutionContext.Implicits.global val foo: Int = Await.result(Future(calc()),Duration.Inf) def printFoo(): Unit = { println(s"Foo = $foo") } private def calc(): Int = { println("Before calc") 42 } } def test(): Unit = { println("Before printFoo") DeadLock.printFoo() println("After printFoo") } 或者在Java世界中非常相似: class Deadlock { static CompletableFuture<Integer> cf; static int foo; public static void printFoo() { System.out.println("Print foo " + foo); } static { cf = new CompletableFuture<Integer>(); new Thread(new Runnable() { @Override public void run() { calcF(); } }).start(); try { foo = cf.get(); System.out.println("Future result = " + cf.get()); } catch (InterruptedException e) { e.printStackTrace();f } catch (ExecutionException e) { e.printStackTrace(); } } private static void calcF() { cf.complete(42); } } public static void main(String[] args) { System.out.println("Before foo"); Deadlock.printFoo(); System.out.println("After foo"); } 如果你清楚为什么这个代码死机,你已经知道了大部分故事,并且可能会自己推断出其余部分.您最后可能只需浏览摘要部分. Java静态初始化程序如何死锁? 让我们从这个故事的结尾开始:为什么Java代码会挂起?这是因为静态初始化程序的Java / JVM的两个保证(有关更多详细信息,请参阅JLS的12.4.2. Detailed Initialization Procedure部分): >静态初始化程序将在该类的任何其他“外部”使用之前运行 用于静态初始化程序的锁是由JVM隐式和管理的,但它存在.这意味着代码在逻辑上类似于以下内容: class Deadlock { static boolean staticInitFinished = false; // unique value for each thread! static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE); static CompletableFuture<Integer> cf; static int foo; static void enforceStaticInit() { synchronized (Deadlock.class) { // is init finished? if (staticInitFinished) return; // are we the thread already running the init? if(currentThreadRunsStaticInit.get()) return; currentThreadRunsStaticInit.set(true); cf = new CompletableFuture<Integer>(); new Thread(new Runnable() { @Override public void run() { calcF(); } }).start(); try { foo = cf.get(); System.out.println("Future result = " + cf.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } currentThreadRunsStaticInit.set(false); staticInitFinished = true; } } private static void calcF() { enforceStaticInit(); cf.complete(42); } public static void printFoo() { enforceStaticInit(); System.out.println("Print foo " + foo); } } 现在很清楚为什么这个代码死锁:我们的静态初始化程序启动一个新线程并阻塞等待它的结果.但是新线程尝试访问同一个类(calcF方法)并且是另一个线程,它必须等待已经运行的静态初始化程序完成.请注意,如果calcF方法在另一个类中,一切都会正常工作. Scala REPL的工作原理 现在让我们回到有关Scala REPL如何工作的故事的开头.这个答案是对真实交易的一个很大的简化,但它抓住了这种情况细节的重要性.幸运的是,对于REPL实现者,Scala编译器是用Scala编写的.这意味着REPL不必以某种方式解释代码,它可以通过标准编译器运行它,然后通过Java Reflection API运行编译的代码.这仍然需要对代码进行一些修改以使编译器满意并获得结果. 当你输入类似的东西时,简化它(或者好吧,很多) val a = Par.lazyUnit(42 + 1) 进入REPL,代码被分析并转换成这样的东西: package line3 object read { val a = Par.lazyUnit(42 + 1) val res3 = a } object eval { def print() = { println("a: Par.Par[Int] = " + read.res3) } } 然后通过反射调用line3.eval.print(). 类似的故事发生在: val es: ExecutorService = Executors.newFixedThreadPool(2) 最后当你这样做 Par.fork(a)(es).get 事情变得更有趣,因为你依赖于以前使用导入巧妙实现的行: package line5 object read { import line2.read.Par import line3.read.a import line4.read.es val res5 = Par.fork(a)(es).get } object eval { def print() = { println("res5: Int = " + read.res5) } } 这里重要的是你写入REPL的所有内容都被包装成一个全新的对象,然后编译并作为通常的代码运行. Scala如何在Java / JVM上模拟按名称参数 fork方法的定义使用by-name parameter: def fork[A](a: => Par[A]): Par[A] = 在这里,它用于评估对于fork的整个逻辑至关重要的延迟. Java / JVM没有标准的延迟评估支持,但它可以被模拟,这就是Scala编译器所做的.在内部,签名被更改为使用Function0: def fork[A](aWrapper: () => Par[A]): Par[A] = 并且每次访问a都会被调用aWrapper.apply()来代替.魔法的另一部分发生在带有by-name参数的方法的调用者端:参数也应该被包装到Function0中,所以代码就像 object read { import line2.read.Par import line3.read.a import line4.read.es val res5 = Par.fork(() => a)(es).get } 但实际上它有点不同.天真地,这个小功能只需要另一个课程,这对于这样一个简单的逻辑感觉很浪费.在Scala 2.12的实践中,使用了Java 8 LambdaMetafactory的神奇之处,因此代码真的变得类似 object read { import line2.read.Par import line3.read.a import line4.read.es def aWrapper():Int = a val res5 = Par.fork(aWrapper _)(es).get } 其中aWrapper _表示将方法转换为使用LambdaMetafactory完成的Funciton0.正如您可能从Java静态初始化程序死锁一章中所怀疑的那样,引入def aWrapper是一个至关重要的区别.您已经可以看到此代码与挂起的答案中的第一个Scala片段非常相似. Scala如何编译Java / JVM上的对象 最后一部分是如何在Java / JVM中编译Scala对象.好吧它实际上被编译成类似于“静态类”的东西但是因为你可以使用object作为对象参数,所以它必须更复杂一些.实际上,所有初始化逻辑都被移动到对象类的构造函数中,并且有一个简单的静态初始化程序可以调用它.所以我们在Java中的最后一个读取对象(忽略导入)如下所示: class read${ static read$MODULE$ static { new read$() } private Par[Int] res5; private read$() { MODULE$= this; res5 = Par.fork(read$::aWrapper)(es).get } private static int aWrapper(){ return line3.read$.MODULE$.a; } } 这里再次读取$:: aWrapper表示使用LambdaMetafactory构建一个Function0形式的aWrapper方法.换句话说,Scala对象的初始化被转换为作为Java静态初始化器的一部分运行的代码. 摘要 总结事情是如何搞砸的: > REPL将您的代码转换为每行的新对象并进行编译>将对象初始化逻辑转换为Java静态初始化逻辑>在简单的情况下调用带有by-name参数的方法被转换为包装“返回值”逻辑的方法,并将该方法添加到同一个类或对象中> Par.fork作为对象初始化的一部分执行(即Java静态初始化程序的一部分)尝试在另一个线程上计算by-name参数(即在同一个类上调用该方法)并阻塞等待该线程的结果> Java静态初始化程序在全局锁下逻辑执行,因此它阻止调用该方法的不同线程.但它本身被阻止等待该方法调用完成. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |