scala,将回调模式转换为函数式内部迭代器
假设给出了这个API,我们无法更改它:
object ProviderAPI { trait Receiver[T] { def receive(entry: T) def close() } def run(r: Receiver[Int]) { new Thread() { override def run() { (0 to 9).foreach { i => r.receive(i) Thread.sleep(100) } r.close() } }.start() } } 在此示例中,ProviderAPI.run接收Receiver,调用receive(i)10次然后关闭.通常,ProviderAPI.run会根据可能无限的集合调用receive(i). 此API旨在以命令式样式使用,如外部迭代器.如果我们的应用程序需要过滤,映射和打印此输入,我们需要实现一个混合所有这些操作的Receiver: object Main extends App { class MyReceiver extends ProviderAPI.Receiver[Int] { def receive(entry: Int) { if (entry % 2 == 0) { println("Entry#" + entry) } } def close() {} } ProviderAPI.run(new MyReceiver()) } 现在,问题是如何在函数式,内部迭代器中使用ProviderAPI(不改变ProviderAPI的实现,这是给我们的).请注意,ProviderAPI也可以无限次地调用receive(i),因此不能选择收集列表中的所有内容(同样,我们应该逐个处理每个结果,而不是先收集所有输入,然后再处理它) . 我问如何实现这样的ReceiverToIterator,以便我们可以在功能样式中使用ProviderAPI: object Main extends App { val iterator = new ReceiverToIterator[Int] // how to implement this? ProviderAPI.run(iterator) iterator .view .filter(_ % 2 == 0) .map("Entry#" + _) .foreach(println) } 更新 这有四个解决方案: > IteratorWithSemaphorSolution:我提议的解决方案解决方案首先附在问题上 解决方法
更新:BlockingQueue的1个条目
你在这里实现的主要是Java的BlockingQueue,队列大小为1. 主要特点:超级阻塞.缓慢的消费者会扼杀你的制作人的表现. 更新:@ gzm0提到BlockingQueue不包括EOF.你必须使用BlockingQueue [Option [T]]. 更新:这是一个代码片段.它可以与您的接收器配合使用. // fairness enabled -- you probably want to preserve order... // alternatively,disable fairness and increase buffer to be 'big enough' private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1,true) // the following block provides you with a potentially blocking peek operation // it should `queue.take` when the previous peeked head has been invalidated // specifically,it will `queue.take` and block when the queue is empty private var head: Option[T] = _ private var headDefined: Boolean = false private def invalidateHead() { headDefined = false } private def peek: Option[T] = { if (!headDefined) { head = queue.take() headDefined = true } head } def iterator = new Iterator[T] { // potentially blocking; only false upon taking `None` def hasNext = peek.isDefined // peeks and invalidates head; throws NoSuchElementException as appropriate def next: T = { val opt = peek; invalidateHead() if (opt.isEmpty) throw new NoSuchElementException else opt.get } } 替代方案:Iteratees 基于迭代器的解决方案通常会涉及更多阻塞.从概念上讲,你可以在执行迭代的线程上使用continuation来避免阻塞线程,但是延续会破坏Scala的for-comprehensions,所以在这条道路上没有快乐. 或者,您可以考虑基于迭代的解决方案.迭代器与迭代器的不同之处在于,消费者不负责推进迭代 – 生产者是.使用迭代,消费者基本上折叠生产者推动的条目随着时间的推移.折叠每个下一个条目可以在线程池中进行折叠,因为在每次折叠完成后放弃该线程. 你不会为迭代的语法得到好处,学习曲线有点挑战,但如果你对使用foldLeft有信心,你最终会得到一个看起来合理的非阻塞解决方案. 要阅读有关iteratees的更多信息,我建议您查看PlayFramework 2.X’s iteratee reference.该文档描述了他们的独立iteratee库,它在Play的上下文之外100%可用. Scalaz 7还有一个全面的iteratee库. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |